1
0
Fork 0

Get schema names with a direct query instead of stripping them from the tables metadata so we can autocomplete schemata without tables in them

This commit is contained in:
Darik Gamble 2015-01-18 18:27:25 -05:00
parent 78dce17428
commit cc6276925f
4 changed files with 38 additions and 9 deletions

View File

@ -332,7 +332,8 @@ def quit_command(sql):
or sql.strip() == ':q')
def refresh_completions(pgexecute, completer):
tables, columns = pgexecute.get_metadata()
schemata, tables, columns = pgexecute.get_metadata()
completer.extend_schemata(schemata)
completer.extend_tables(tables)
completer.extend_columns(columns)
completer.extend_database_names(pgexecute.databases())

View File

@ -54,6 +54,12 @@ def _parse_dsn(dsn, default_user, default_password, default_host,
class PGExecute(object):
schemata_query = '''
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname !~ '^pg_'
AND nspname <> 'information_schema' '''
tables_query = '''
SELECT n.nspname schema_name,
c.relname table_name,
@ -163,13 +169,20 @@ class PGExecute(object):
return (None, None, cur.statusmessage)
def get_metadata(self):
""" Returns a tuple [tables, columns] of DataFrames
""" Returns a tuple [schemata, tables, columns] of DataFrames
schemata: DataFrame with columns [schema]
tables: DataFrame with columns [schema, table, is_visible]
columns: DataFrame with columns [schema, table, column]
"""
with self.conn.cursor() as cur:
_logger.debug('Schemata Query. sql: %r', self.schemata_query)
cur.execute(self.schemata_query)
schemata = DataFrame.from_records(cur,
columns=['schema'])
with self.conn.cursor() as cur:
_logger.debug('Tables Query. sql: %r', self.tables_query)
cur.execute(self.tables_query)
@ -182,7 +195,7 @@ class PGExecute(object):
columns = DataFrame.from_records(cur,
columns=['schema', 'table', 'column'])
return [tables, columns]
return [schemata, tables, columns]
def databases(self):
with self.conn.cursor() as cur:

View File

@ -55,14 +55,25 @@ def test_conn(executor):
SELECT 1""")
@dbtest
def test_table_and_columns_query(executor):
def test_schemata_table_and_columns_query(executor):
run(executor, "create table a(x text, y text)")
run(executor, "create table b(z text)")
run(executor, "create schema schema1")
run(executor, "create table schema1.c (w text)")
run(executor, "create schema schema2")
tables, columns = executor.get_metadata()
assert set(tables['table']) == set(['a', 'b'])
assert set(columns['column'][columns['table']=='a']) == set(['x', 'y'])
assert set(columns['column'][columns['table']=='b']) == set(['z'])
schemata, tables, columns = executor.get_metadata()
assert schemata.to_dict('list') == {
'schema': ['public', 'schema1', 'schema2']}
assert tables.to_dict('list') == {
'schema': ['public', 'public', 'schema1'],
'table': ['a', 'b', 'c'],
'is_visible': [True, True, False]}
assert columns.to_dict('list') == {
'schema': ['public', 'public', 'public', 'schema1'],
'table': ['a', 'a', 'b', 'c'],
'column': ['x', 'y', 'z', 'w']}
@dbtest
def test_database_list(executor):

View File

@ -32,7 +32,11 @@ def create_db(dbname):
def drop_tables(conn):
with conn.cursor() as cur:
cur.execute('''DROP SCHEMA public CASCADE; CREATE SCHEMA public''')
cur.execute('''
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;
DROP SCHEMA IF EXISTS schema1 CASCADE;
DROP SCHEMA IF EXISTS schema2 CASCADE''')
def run(executor, sql, join=False):