1
0
Fork 0

Make autocomplete schema-aware

This commit is contained in:
Darik Gamble 2015-01-18 14:32:30 -05:00
parent a795c4f200
commit 78288101c2
9 changed files with 522 additions and 184 deletions

View File

@ -332,11 +332,9 @@ def quit_command(sql):
or sql.strip() == ':q')
def refresh_completions(pgexecute, completer):
tables, columns = pgexecute.tables()
completer.extend_table_names(tables)
for table in tables:
table = table[1:-1] if table[0] == '"' and table[-1] == '"' else table
completer.extend_column_names(table, columns[table])
tables, columns = pgexecute.get_metadata()
completer.extend_tables(tables)
completer.extend_columns(columns)
completer.extend_database_names(pgexecute.databases())
if __name__ == "__main__":

View File

@ -1,6 +1,7 @@
from __future__ import print_function
import re
import sqlparse
from pandas import DataFrame
from sqlparse.sql import IdentifierList, Identifier, Function
from sqlparse.tokens import Keyword, DML, Punctuation
@ -101,50 +102,51 @@ def extract_from_part(parsed, stop_at_punctuation=True):
break
def extract_table_identifiers(token_stream):
"""yields tuples of (schema_name, table_name, table_alias)"""
for item in token_stream:
if isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
# Sometimes Keywords (such as FROM ) are classified as
# identifiers which don't have the get_real_name() method.
try:
schema_name = identifier.get_parent_name()
real_name = identifier.get_real_name()
except AttributeError:
continue
if real_name:
yield (real_name, identifier.get_alias() or real_name)
yield (schema_name, real_name, identifier.get_alias())
elif isinstance(item, Identifier):
real_name = item.get_real_name()
schema_name = item.get_parent_name()
if real_name:
yield (real_name, item.get_alias() or real_name)
yield (schema_name, real_name, item.get_alias())
else:
name = item.get_name()
yield (name, item.get_alias() or name)
yield (None, name, item.get_alias() or name)
elif isinstance(item, Function):
yield (item.get_name(), item.get_name())
yield (None, item.get_name(), item.get_name())
# extract_tables is inspired from examples in the sqlparse lib.
def extract_tables(sql, include_alias=False):
def extract_tables(sql):
"""Extract the table names from an SQL statment.
Returns a list of table names if include_alias=False (default).
If include_alias=True, then a dictionary is returned where the keys are
aliases and values are real table names.
Returns a DataFrame with columns [schema, table, alias]
"""
parsed = sqlparse.parse(sql)
if not parsed:
return []
return DataFrame({}, columns=['schema', 'table', 'alias'])
# INSERT statements must stop looking for tables at the sign of first
# Punctuation. eg: INSERT INTO abc (col1, col2) VALUES (1, 2)
# abc is the table name, but if we don't stop at the first lparen, then
# we'll identify abc, col1 and col2 as table names.
insert_stmt = parsed[0].token_first().value.lower() == 'insert'
stream = extract_from_part(parsed[0], stop_at_punctuation=insert_stmt)
if include_alias:
return dict((alias, t) for t, alias in extract_table_identifiers(stream))
else:
return [x[0] for x in extract_table_identifiers(stream)]
tables = extract_table_identifiers(stream)
return DataFrame.from_records(tables, columns=['schema', 'table', 'alias'])
def find_prev_keyword(sql):
if not sql.strip():
@ -156,4 +158,4 @@ def find_prev_keyword(sql):
if __name__ == '__main__':
sql = 'select * from (select t. from tabl t'
print (extract_tables(sql, True))
print (extract_tables(sql))

View File

@ -46,6 +46,12 @@ def suggest_type(full_text, text_before_cursor):
return suggest_based_on_last_token(last_token, text_before_cursor, full_text)
def suggest_based_on_last_token(token, text_before_cursor, full_text):
"""Returns a list of suggestion dicts
A suggestion dict is a dict with a mandatory field "type", and optional
additional fields supplying additional scope information.
"""
if isinstance(token, string_types):
token_v = token
else:
@ -66,28 +72,67 @@ def suggest_based_on_last_token(token, text_before_cursor, full_text):
# If the lparen is preceeded by a space chances are we're about to
# do a sub-select.
if last_word(text_before_cursor, 'all_punctuations').startswith('('):
return 'keywords', []
return 'columns', extract_tables(full_text)
return [{'type': 'keyword'}]
return [{'type': 'column', 'tables': extract_tables(full_text)}]
if token_v.lower() in ('set', 'by', 'distinct'):
return 'columns', extract_tables(full_text)
return [{'type': 'column', 'tables': extract_tables(full_text)}]
elif token_v.lower() in ('select', 'where', 'having'):
return 'columns-and-functions', extract_tables(full_text)
return [{'type': 'column', 'tables': extract_tables(full_text)},
{'type': 'function'}]
elif token_v.lower() in ('from', 'update', 'into', 'describe', 'join', 'table'):
return 'tables', []
return [{'type': 'schema'}, {'type': 'table', 'schema': []}]
elif token_v.lower() == 'on':
tables = extract_tables(full_text, include_alias=True)
return 'tables-or-aliases', tables.keys()
tables = extract_tables(full_text)
# Use table alias if there is one, otherwise the table name
alias = tables['alias'].where(tables['alias'].notnull(), tables['table'])
return [{'type': 'alias', 'aliases': list(alias)}]
elif token_v in ('d',): # \d
return 'tables', []
return [{'type': 'schema'}, {'type': 'table', 'schema': []}]
elif token_v.lower() in ('c', 'use'): # \c
return 'databases', []
return [{'type': 'database'}]
elif token_v.endswith(','):
prev_keyword = find_prev_keyword(text_before_cursor)
if prev_keyword:
return suggest_based_on_last_token(prev_keyword, text_before_cursor, full_text)
return suggest_based_on_last_token(
prev_keyword, text_before_cursor, full_text)
elif token_v.endswith('.'):
current_alias = last_word(token_v[:-1])
tables = extract_tables(full_text, include_alias=True)
return 'columns', [tables.get(current_alias) or current_alias]
return 'keywords', []
suggestions = []
identifier = last_word(token_v[:-1], 'all_punctuations')
# TABLE.<suggestion> or SCHEMA.TABLE.<suggestion>
tables = extract_tables(full_text)
tables = get_matching_tables(tables, identifier)
suggestions.append({'type': 'column',
'tables': tables[['schema', 'table', 'alias']]})
# SCHEMA.<suggestion>
suggestions.append({'type': 'table', 'schema': identifier})
return suggestions
return [{'type': 'keyword'}]
def get_matching_tables(tables, identifier):
"""
:param tables: DataFrame with columns [schema, table, alias]
:param identifier: a table name, table alias, or fully qualified schema.tablename
:return: a row or rows from tables that match the indentifier
"""
tables['full_name'] = tables.apply(qualify_table_name, axis=1)
#match a table to an identifier if the identifer is equal to any one of the
#table name, table alias, or schema-qualified table name
matches = tables[['table', 'alias', 'full_name']] == identifier
is_match = matches.any(axis=1)
return tables[is_match]
def qualify_table_name(row):
return row['schema'] + '.' + row['table'] if row['schema'] else row['table']

View File

@ -5,6 +5,7 @@ from prompt_toolkit.completion import Completer, Completion
from .packages.sqlcompletion import suggest_type
from .packages.parseutils import last_word
from re import compile
from pandas import DataFrame
_logger = logging.getLogger(__name__)
@ -36,10 +37,10 @@ class PGCompleter(Completer):
special_commands = []
databases = []
tables = []
# This will create a defaultdict which is initialized with a list that has
# a '*' by default.
columns = defaultdict(lambda: ['*'])
schemata = set()
tables = DataFrame({}, columns=['schema', 'table', 'alias'])
columns = DataFrame({}, columns=['schema', 'table', 'column'])
all_completions = set(keywords + functions)
def __init__(self, smart_completion=True):
@ -71,31 +72,39 @@ class PGCompleter(Completer):
def extend_database_names(self, databases):
databases = self.escaped_names(databases)
self.databases.extend(databases)
def extend_keywords(self, additional_keywords):
self.keywords.extend(additional_keywords)
self.all_completions.update(additional_keywords)
def extend_table_names(self, tables):
tables = self.escaped_names(tables)
def extend_tables(self, data):
self.tables.extend(tables)
self.all_completions.update(tables)
# data is a DataFrame with columns [schema, table, is_visible]
data[['schema', 'table']].apply(self.escaped_names)
self.tables = self.tables.append(data)
def extend_column_names(self, table, columns):
columns = self.escaped_names(columns)
self.schemata.update(data['schema'])
self.all_completions.update(data['schema'])
self.all_completions.update(data['table'])
unescaped_table_name = self.unescape_name(table)
# Auto-add '*' as a column in all tables
cols = data[['schema', 'table']].copy()
cols['column'] = '*'
self.columns = self.columns.append(cols)
self.columns[unescaped_table_name].extend(columns)
self.all_completions.update(columns)
def extend_columns(self, data):
# data is a DataFrame with columns [schema, table, column]
data[['schema', 'table', 'column']].apply(self.escaped_names)
self.columns = self.columns.append(data)
self.all_completions.update(data.column)
def reset_completions(self):
self.databases = []
self.tables = []
self.columns = defaultdict(lambda: ['*'])
self.schemata = set()
self.tables = DataFrame({}, columns=['schema', 'table', 'alias'])
self.columns = DataFrame({}, columns=['schema', 'table', 'column'])
self.all_completions = set(self.keywords)
@staticmethod
@ -115,36 +124,71 @@ class PGCompleter(Completer):
if not smart_completion:
return self.find_matches(word_before_cursor, self.all_completions)
category, scope = suggest_type(document.text,
document.text_before_cursor)
completions = []
suggestions = suggest_type(document.text, document.text_before_cursor)
for suggestion in suggestions:
_logger.debug('Suggestion type: %r', suggestion['type'])
if suggestion['type'] == 'column':
tables = suggestion['tables']
_logger.debug("Completion column scope: %r", tables)
scoped_cols = self.populate_scoped_cols(tables)
cols = self.find_matches(word_before_cursor, scoped_cols)
completions.extend(cols)
elif suggestion['type'] == 'function':
funcs = self.find_matches(word_before_cursor, self.functions)
completions.extend(funcs)
elif suggestion['type'] == 'schema':
schemata = self.find_matches(word_before_cursor, self.schemata)
completions.extend(schemata)
elif suggestion['type'] == 'table':
meta = self.tables
if suggestion['schema']:
tables = meta.table[meta.schema == suggestion['schema']]
else:
tables = meta.table[meta.is_visible]
tables = self.find_matches(word_before_cursor, tables)
completions.extend(tables)
elif suggestion['type'] == 'alias':
aliases = suggestion['aliases']
aliases = self.find_matches(word_before_cursor, aliases)
completions.extend(aliases)
elif suggestion['type'] == 'database':
dbs = self.find_matches(word_before_cursor, self.databases)
completions.extend(dbs)
elif suggestion['type'] == 'keyword':
keywords = self.keywords + self.special_commands
keywords = self.find_matches(word_before_cursor, keywords)
completions.extend(keywords)
return completions
def populate_scoped_cols(self, scoped_tbls):
""" Find all columns in a set of scoped_tables
:param scoped_tbls: DataFrame with columns [schema, table, alias]
:return: list of column names
"""
columns = self.columns # dataframe with columns [schema, table, column]
scoped_tbls[['schema', 'table', 'alias']].apply(self.unescape_name)
# For fully qualified tables, inner join on (schema, table)
qualed = scoped_tbls.merge(columns, how='inner', on=['schema', 'table'])
# Only allow unqualified table reference on visible tables
vis_tables = self.tables[self.tables['is_visible']]
unqualed_tables = scoped_tbls.merge(vis_tables, how='inner', on=['table'])
unqualed = unqualed_tables.merge(columns, how='inner', on=['table'])
return list(qualed['column']) + list(unqualed['column'])
if category == 'columns':
_logger.debug("Completion: 'columns' Scope: %r", scope)
scoped_cols = self.populate_scoped_cols(scope)
return self.find_matches(word_before_cursor, scoped_cols)
elif category == 'columns-and-functions':
_logger.debug("Completion: 'columns-and-functions' Scope: %r",
scope)
scoped_cols = self.populate_scoped_cols(scope)
return self.find_matches(word_before_cursor, scoped_cols +
self.functions)
elif category == 'tables':
_logger.debug("Completion: 'tables' Scope: %r", scope)
return self.find_matches(word_before_cursor, self.tables)
elif category == 'tables-or-aliases':
_logger.debug("Completion: 'tables-or-aliases' Scope: %r", scope)
return self.find_matches(word_before_cursor, scope)
elif category == 'databases':
_logger.debug("Completion: 'databases' Scope: %r", scope)
return self.find_matches(word_before_cursor, self.databases)
elif category == 'keywords':
_logger.debug("Completion: 'keywords' Scope: %r", scope)
return self.find_matches(word_before_cursor, self.keywords +
self.special_commands)
def populate_scoped_cols(self, tables):
scoped_cols = []
for table in tables:
unescaped_table_name = self.unescape_name(table)
scoped_cols.extend(self.columns[unescaped_table_name])
return scoped_cols

View File

@ -3,7 +3,7 @@ import psycopg2
import psycopg2.extras
import psycopg2.extensions
import sqlparse
from collections import defaultdict
from pandas import DataFrame
from .packages import pgspecial
_logger = logging.getLogger(__name__)
@ -54,13 +54,34 @@ def _parse_dsn(dsn, default_user, default_password, default_host,
class PGExecute(object):
tables_query = '''SELECT c.relname as "Name" FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE
c.relkind IN ('r','') AND n.nspname <> 'pg_catalog' AND n.nspname <>
'information_schema' AND n.nspname !~ '^pg_toast' AND
pg_catalog.pg_table_is_visible(c.oid) ORDER BY 1;'''
tables_query = '''
SELECT n.nspname schema_name,
c.relname table_name,
pg_catalog.pg_table_is_visible(c.oid) is_visible
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n
ON n.oid = c.relnamespace
WHERE c.relkind IN ('r','v', 'm') -- table, view, materialized view
AND n.nspname !~ '^pg_toast'
AND n.nspname NOT IN ('information_schema', 'pg_catalog')
ORDER BY 1,2;'''
columns_query = '''
SELECT nsp.nspname schema_name,
cls.relname table_name,
att.attname column_name
FROM pg_catalog.pg_attribute att
INNER JOIN pg_catalog.pg_class cls
ON att.attrelid = cls.oid
INNER JOIN pg_catalog.pg_namespace nsp
ON cls.relnamespace = nsp.oid
WHERE cls.relkind IN ('r', 'v', 'm')
AND nsp.nspname !~ '^pg_'
AND nsp.nspname <> 'information_schema'
AND NOT att.attisdropped
AND att.attnum > 0
ORDER BY 1, 2, 3'''
columns_query = '''SELECT table_name, column_name FROM information_schema.columns'''
databases_query = """SELECT d.datname as "Name",
pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
@ -141,22 +162,27 @@ class PGExecute(object):
_logger.debug('No rows in result.')
return (None, None, cur.statusmessage)
def tables(self):
""" Returns tuple (sorted_tables, columns). Columns is a dictionary of
table name -> list of columns """
columns = defaultdict(list)
def get_metadata(self):
""" Returns a tuple [tables, columns] of DataFrames
tables: DataFrame with columns [schema, table, is_visible]
columns: DataFrame with columns [schema, table, column]
"""
with self.conn.cursor() as cur:
_logger.debug('Tables Query. sql: %r', self.tables_query)
cur.execute(self.tables_query)
tables = [x[0] for x in cur.fetchall()]
tables = DataFrame.from_records(cur,
columns=['schema', 'table', 'is_visible'])
table_set = set(tables)
with self.conn.cursor() as cur:
_logger.debug('Columns Query. sql: %r', self.columns_query)
cur.execute(self.columns_query)
for table, column in cur.fetchall():
if table in table_set:
columns[table].append(column)
return tables, columns
columns = DataFrame.from_records(cur,
columns=['schema', 'table', 'column'])
return [tables, columns]
def databases(self):
with self.conn.cursor() as cur:

View File

@ -3,52 +3,114 @@ from pgcli.packages.parseutils import extract_tables
def test_empty_string():
tables = extract_tables('')
assert tables == []
assert tables.to_dict('list') == {'schema': [], 'table': [], 'alias': []}
def test_simple_select_single_table():
tables = extract_tables('select * from abc')
assert tables == ['abc']
assert tables.to_dict('list') == \
{'schema': [None], 'table': ['abc'], 'alias': [None]}
def test_simple_select_single_table_schema_qualified():
tables = extract_tables('select * from abc.def')
assert tables.to_dict('list') == \
{'schema': ['abc'], 'table': ['def'], 'alias': [None]}
def test_simple_select_multiple_tables():
tables = extract_tables('select * from abc, def')
assert tables == ['abc', 'def']
assert tables.to_dict('list') == \
{'schema': [None, None],
'table': ['abc', 'def'],
'alias': [None, None]}
def test_simple_select_multiple_tables_schema_qualified():
tables = extract_tables('select * from abc.def, ghi.jkl')
assert tables.to_dict('list') == \
{'schema': ['abc', 'ghi'],
'table': ['def', 'jkl'],
'alias': [None, None]}
def test_simple_select_with_cols_single_table():
tables = extract_tables('select a,b from abc')
assert tables == ['abc']
assert tables.to_dict('list') == \
{'schema': [None], 'table': ['abc'], 'alias': [None]}
def test_simple_select_with_cols_single_table_schema_qualified():
tables = extract_tables('select a,b from abc.def')
assert tables.to_dict('list') == \
{'schema': ['abc'], 'table': ['def'], 'alias': [None]}
def test_simple_select_with_cols_multiple_tables():
tables = extract_tables('select a,b from abc, def')
assert tables == ['abc', 'def']
assert tables.to_dict('list') == \
{'schema': [None, None],
'table': ['abc', 'def'],
'alias': [None, None]}
def test_simple_select_with_cols_multiple_tables():
tables = extract_tables('select a,b from abc.def, def.ghi')
assert tables.to_dict('list') == \
{'schema': ['abc', 'def'],
'table': ['def', 'ghi'],
'alias': [None, None]}
def test_select_with_hanging_comma_single_table():
tables = extract_tables('select a, from abc')
assert tables == ['abc']
assert tables.to_dict('list') == \
{'schema': [None],
'table': ['abc'],
'alias': [None]}
def test_select_with_hanging_comma_multiple_tables():
tables = extract_tables('select a, from abc, def')
assert tables == ['abc', 'def']
assert tables.to_dict('list') == \
{'schema': [None, None],
'table': ['abc', 'def'],
'alias': [None, None]}
def test_select_with_hanging_period_multiple_tables():
tables = extract_tables('SELECT t1. FROM tabl1 t1, tabl2 t2')
assert tables.to_dict('list') == \
{'schema': [None, None],
'table': ['tabl1', 'tabl2'],
'alias': ['t1', 't2']}
def test_simple_insert_single_table():
tables = extract_tables('insert into abc (id, name) values (1, "def")')
assert tables == ['abc']
assert tables.to_dict('list') == \
{'schema': [None], 'table': ['abc'], 'alias': ['abc']}
@pytest.mark.xfail
def test_simple_insert_single_table_schema_qualified():
tables = extract_tables('insert into abc.def (id, name) values (1, "def")')
assert tables.to_dict('list') == \
{'schema': ['abc'], 'table': ['def'], 'alias': [None]}
def test_simple_update_table():
tables = extract_tables('update abc set id = 1')
assert tables == ['abc']
assert tables.to_dict('list') == \
{'schema': [None], 'table': ['abc'], 'alias': [None]}
def test_simple_update_table():
tables = extract_tables('update abc.def set id = 1')
assert tables.to_dict('list') == \
{'schema': ['abc'], 'table': ['def'], 'alias': [None]}
def test_join_table():
expected = {'a': 'abc', 'd': 'def'}
tables = extract_tables('SELECT * FROM abc a JOIN def d ON a.id = d.num')
tables_aliases = extract_tables(
'SELECT * FROM abc a JOIN def d ON a.id = d.num', True)
assert tables == sorted(expected.values())
assert tables_aliases == expected
assert tables.to_dict('list') == \
{'schema': [None, None],
'table': ['abc', 'def'],
'alias': ['a', 'd']}
def test_join_table_schema_qualified():
tables = extract_tables('SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num')
assert tables.to_dict('list') == \
{'schema': ['abc', 'ghi'],
'table': ['def', 'jkl'],
'alias': ['x', 'y']}
def test_join_as_table():
expected = {'m': 'my_table'}
assert extract_tables(
'SELECT * FROM my_table AS m WHERE m.a > 5') == \
sorted(expected.values())
assert extract_tables(
'SELECT * FROM my_table AS m WHERE m.a > 5', True) == expected
tables = extract_tables('SELECT * FROM my_table AS m WHERE m.a > 5')
assert tables.to_dict('list') == \
{'schema': [None], 'table': ['my_table'], 'alias': ['m']}

View File

@ -59,10 +59,10 @@ def test_table_and_columns_query(executor):
run(executor, "create table a(x text, y text)")
run(executor, "create table b(z text)")
tables, columns = executor.tables()
assert tables == ['a', 'b']
assert columns['a'] == ['x', 'y']
assert columns['b'] == ['z']
tables, columns = executor.get_metadata()
assert set(tables['table']) == set(['a', 'b'])
assert set(columns['column'][columns['table']=='a']) == set(['x', 'y'])
assert set(columns['column'][columns['table']=='b']) == set(['z'])
@dbtest
def test_database_list(executor):

View File

@ -1,20 +1,45 @@
import pytest
from pandas import DataFrame
from prompt_toolkit.completion import Completion
from prompt_toolkit.document import Document
tables = {
'users': ['id', 'email', 'first_name', 'last_name'],
'orders': ['id', 'user_id', 'ordered_date', 'status']
}
schemata = {
'public': {
'users': ['id', 'email', 'first_name', 'last_name'],
'orders': ['id', 'ordered_date', 'status']
},
'custom': {
'products': ['id', 'product_name', 'price'],
'shipments': ['id', 'address', 'user_id']
}
}
@pytest.fixture
def completer():
import pgcli.pgcompleter as pgcompleter
comp = pgcompleter.PGCompleter(smart_completion=True)
comp.extend_table_names(tables.keys())
for t in tables:
comp.extend_column_names(t, tables[t])
# Table metadata is a dataframe with columns [schema, table, is_visible]
tables = DataFrame.from_records(
((schema, table, schema=='public')
for schema, tables in schemata.items()
for table, columns in tables.items()),
columns=['schema', 'table', 'is_visible'])
# Column metadata is a dataframe with columns [schema, table, column]
columns = DataFrame.from_records(
((schema, table, column)
for schema, tables in schemata.items()
for table, columns in tables.items()
for column in columns),
columns=['schema', 'table', 'column'])
comp.extend_tables(tables)
comp.extend_columns(columns)
return comp
@pytest.fixture
@ -39,15 +64,26 @@ def test_select_keyword_completion(completer, complete_event):
complete_event)
assert set(result) == set([Completion(text='SELECT', start_position=-3)])
def test_schema_or_visible_table_completion(completer, complete_event):
text = 'SELECT * FROM '
position = len(text)
result = completer.get_completions(
Document(text=text, cursor_position=position), complete_event)
assert set(result) == set([Completion(text='public', start_position=0),
Completion(text='custom', start_position=0),
Completion(text='users', start_position=0),
Completion(text='orders', start_position=0)])
def test_function_name_completion(completer, complete_event):
text = 'SELECT MA'
position = len('SELECT MA')
result = completer.get_completions(
Document(text=text, cursor_position=position),
complete_event)
Document(text=text, cursor_position=position), complete_event)
assert set(result) == set([Completion(text='MAX', start_position=-2)])
def test_suggested_column_names(completer, complete_event):
def test_suggested_column_names_from_visible_table(completer, complete_event):
"""
Suggest column and function names when selecting from table
:param completer:
@ -67,6 +103,24 @@ def test_suggested_column_names(completer, complete_event):
Completion(text='last_name', start_position=0)] +
list(map(Completion, completer.functions)))
def test_suggested_column_names_from_schema_qualifed_table(completer, complete_event):
"""
Suggest column and function names when selecting from a qualified-table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT from custom.products'
position = len('SELECT ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position), complete_event))
assert set(result) == set([
Completion(text='*', start_position=0),
Completion(text='id', start_position=0),
Completion(text='product_name', start_position=0),
Completion(text='price', start_position=0)] +
list(map(Completion, completer.functions)))
def test_suggested_column_names_in_function(completer, complete_event):
"""
Suggest column and function names when selecting multiple
@ -87,7 +141,7 @@ def test_suggested_column_names_in_function(completer, complete_event):
Completion(text='first_name', start_position=0),
Completion(text='last_name', start_position=0)])
def test_suggested_column_names_with_dot(completer, complete_event):
def test_suggested_column_names_with_table_dot(completer, complete_event):
"""
Suggest column names on table name and dot
:param completer:
@ -106,6 +160,15 @@ def test_suggested_column_names_with_dot(completer, complete_event):
Completion(text='first_name', start_position=0),
Completion(text='last_name', start_position=0)])
def test_suggested_table_names_with_schema_dot(completer, complete_event):
text = 'SELECT * FROM custom.'
position = len(text)
result = completer.get_completions(
Document(text=text, cursor_position=position), complete_event)
assert set(result) == set([
Completion(text='products', start_position=0),
Completion(text='shipments', start_position=0)])
def test_suggested_column_names_with_alias(completer, complete_event):
"""
Suggest column names on table alias and dot

View File

@ -1,129 +1,227 @@
from pgcli.packages.sqlcompletion import suggest_type
import pytest
def test_select_suggests_cols_with_table_scope():
suggestion = suggest_type('SELECT FROM tabl', 'SELECT ')
assert suggestion == ('columns-and-functions', ['tabl'])
def assert_equals(suggestions, expected_suggestions):
""" Wrapper to convert dataframes to structs
"""
for suggestion in suggestions:
if 'tables' in suggestion:
suggestion['tables'] = suggestion['tables'].to_dict('list')
assert sorted(suggestions) == sorted(expected_suggestions)
def test_select_suggests_cols_with_visible_table_scope():
suggestions = suggest_type('SELECT FROM tabl', 'SELECT ')
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None],
'table': ['tabl'],
'alias': [None]}},
{'type': 'function'}])
def test_select_suggests_cols_with_qualified_table_scope():
suggestions = suggest_type('SELECT FROM sch.tabl', 'SELECT ')
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': ['sch'],
'table': ['tabl'],
'alias': [None]}},
{'type': 'function'}])
def test_where_suggests_columns_functions():
suggestion = suggest_type('SELECT * FROM tabl WHERE ',
suggestions = suggest_type('SELECT * FROM tabl WHERE ',
'SELECT * FROM tabl WHERE ')
assert suggestion == ('columns-and-functions', ['tabl'])
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None], 'table': ['tabl'], 'alias': [None]}},
{'type': 'function'}])
def test_lparen_suggests_cols():
suggestion = suggest_type('SELECT MAX( FROM tbl', 'SELECT MAX(')
assert suggestion == ('columns', ['tbl'])
assert_equals(suggestion,
[{'type': 'column',
'tables': {'schema': [None], 'table': ['tbl'], 'alias': [None]}}])
def test_select_suggests_cols_and_funcs():
suggestion = suggest_type('SELECT ', 'SELECT ')
assert suggestion == ('columns-and-functions', [])
suggestions = suggest_type('SELECT ', 'SELECT ')
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [], 'table': [], 'alias': []}},
{'type': 'function'}])
def test_from_suggests_tables():
def test_from_suggests_tables_and_schemas():
suggestion = suggest_type('SELECT * FROM ', 'SELECT * FROM ')
assert suggestion == ('tables', [])
assert sorted(suggestion) == sorted([
{'type': 'table', 'schema':[]},
{'type': 'schema'}])
def test_distinct_suggests_cols():
suggestion = suggest_type('SELECT DISTINCT ', 'SELECT DISTINCT ')
assert suggestion == ('columns', [])
suggestions = suggest_type('SELECT DISTINCT ', 'SELECT DISTINCT ')
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [], 'table': [], 'alias': []}}])
def test_col_comma_suggests_cols():
suggestion = suggest_type('SELECT a, b, FROM tbl', 'SELECT a, b,')
assert suggestion == ('columns-and-functions', ['tbl'])
assert_equals(suggestion,
[{'type': 'column',
'tables': {'schema': [None],
'table': ['tbl'],
'alias': [None]}},
{'type': 'function'}])
def test_table_comma_suggests_tables():
def test_table_comma_suggests_tables_and_schemas():
suggestion = suggest_type('SELECT a, b FROM tbl1, ',
'SELECT a, b FROM tbl1, ')
assert suggestion == ('tables', [])
assert sorted(suggestion) == sorted([
{'type': 'table', 'schema':[]},
{'type': 'schema'}])
def test_into_suggests_tables():
def test_into_suggests_tables_and_schemas():
suggestion = suggest_type('INSERT INTO ', 'INSERT INTO ')
assert suggestion == ('tables', [])
assert sorted(suggestion) == sorted([
{'type': 'table', 'schema': []},
{'type': 'schema'}])
def test_insert_into_lparen_suggests_cols():
suggestion = suggest_type('INSERT INTO abc (', 'INSERT INTO abc (')
assert suggestion == ('columns', ['abc'])
suggestions = suggest_type('INSERT INTO abc (', 'INSERT INTO abc (')
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None],
'table': ['abc'],
'alias': [None]}}])
def test_insert_into_lparen_partial_text_suggests_cols():
suggestion = suggest_type('INSERT INTO abc (i', 'INSERT INTO abc (i')
assert suggestion == ('columns', ['abc'])
suggestions = suggest_type('INSERT INTO abc (i', 'INSERT INTO abc (i')
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None],
'table': ['abc'],
'alias': [None]}}])
def test_insert_into_lparen_comma_suggests_cols():
suggestion = suggest_type('INSERT INTO abc (id,', 'INSERT INTO abc (id,')
assert suggestion == ('columns', ['abc'])
suggestions = suggest_type('INSERT INTO abc (id,', 'INSERT INTO abc (id,')
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None],
'table': ['abc'],
'alias': [None]}}])
def test_partially_typed_col_name_suggests_col_names():
suggestion = suggest_type('SELECT * FROM tabl WHERE col_n',
'SELECT * FROM tabl WHERE col_n')
assert suggestion == ('columns-and-functions', ['tabl'])
assert_equals(suggestion,
[{'type': 'column',
'tables': {'schema': [None],
'table': ['tabl'],
'alias': [None]}},
{'type': 'function'}])
def test_dot_suggests_cols_of_a_table():
suggestion = suggest_type('SELECT tabl. FROM tabl', 'SELECT tabl.')
assert suggestion == ('columns', ['tabl'])
def test_dot_suggests_cols_of_a_table_or_schema_qualified_table():
suggestions = suggest_type('SELECT tabl. FROM tabl', 'SELECT tabl.')
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None], 'table': ['tabl'], 'alias': [None]}},
{'type': 'table', 'schema': 'tabl'}])
suggestions = suggest_type('SELECT tabl. FROM tabl', 'SELECT tabl.')
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None], 'table': ['tabl'], 'alias': [None]}},
{'type': 'table', 'schema': 'tabl'}])
def test_dot_suggests_cols_of_an_alias():
suggestion = suggest_type('SELECT t1. FROM tabl1 t1, tabl2 t2',
suggestions = suggest_type('SELECT t1. FROM tabl1 t1, tabl2 t2',
'SELECT t1.')
assert suggestion == ('columns', ['tabl1'])
assert_equals(suggestions,
[{'type': 'table', 'schema': 't1'},
{'type': 'column',
'tables': {'schema': [None], 'table': ['tabl1'], 'alias': ['t1']}}])
def test_dot_col_comma_suggests_cols():
suggestion = suggest_type('SELECT t1.a, t2. FROM tabl1 t1, tabl2 t2',
def test_dot_col_comma_suggests_cols_or_schema_qualified_table():
suggestions = suggest_type('SELECT t1.a, t2. FROM tabl1 t1, tabl2 t2',
'SELECT t1.a, t2.')
assert suggestion == ('columns', ['tabl2'])
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None],
'table': ['tabl2'],
'alias': ['t2']}},
{'type': 'table', 'schema': 't2'}])
def test_sub_select_suggests_keyword():
suggestion = suggest_type('SELECT * FROM (', 'SELECT * FROM (')
assert suggestion == ('keywords', [])
assert suggestion == [{'type': 'keyword'}]
def test_sub_select_partial_text_suggests_keyword():
suggestion = suggest_type('SELECT * FROM (S', 'SELECT * FROM (S')
assert suggestion == ('keywords', [])
assert suggestion == [{'type': 'keyword'}]
def test_sub_select_table_name_completion():
suggestion = suggest_type('SELECT * FROM (SELECT * FROM ',
'SELECT * FROM (SELECT * FROM ')
assert suggestion == ('tables', [])
assert sorted(suggestion) == sorted([
{'type': 'table', 'schema': []}, {'type': 'schema'}])
def test_sub_select_col_name_completion():
suggestion = suggest_type('SELECT * FROM (SELECT FROM abc',
suggestions = suggest_type('SELECT * FROM (SELECT FROM abc',
'SELECT * FROM (SELECT ')
assert suggestion == ('columns-and-functions', ['abc'])
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None], 'table': ['abc'], 'alias': [None]}},
{'type': 'function'}])
@pytest.mark.xfail
def test_sub_select_multiple_col_name_completion():
suggestion = suggest_type('SELECT * FROM (SELECT a, FROM abc',
suggestions = suggest_type('SELECT * FROM (SELECT a, FROM abc',
'SELECT * FROM (SELECT a, ')
assert suggestion == ('columns-and-functions', ['abc'])
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None], 'table': ['abc'], 'alias': [None]}},
{'type': 'function'}])
def test_sub_select_dot_col_name_completion():
suggestion = suggest_type('SELECT * FROM (SELECT t. FROM tabl t',
suggestions = suggest_type('SELECT * FROM (SELECT t. FROM tabl t',
'SELECT * FROM (SELECT t.')
assert suggestion == ('columns', ['tabl'])
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None], 'table': ['tabl'], 'alias': ['t']}},
{'type': 'table', 'schema': 't'}])
def test_join_suggests_tables():
def test_join_suggests_tables_and_schemas():
suggestion = suggest_type('SELECT * FROM abc a JOIN ',
'SELECT * FROM abc a JOIN ')
assert suggestion == ('tables', [])
assert sorted(suggestion) == sorted([
{'type': 'table', 'schema': []},
{'type': 'schema'}])
def test_join_alias_dot_suggests_cols1():
suggestion = suggest_type('SELECT * FROM abc a JOIN def d ON a.',
suggestions = suggest_type('SELECT * FROM abc a JOIN def d ON a.',
'SELECT * FROM abc a JOIN def d ON a.')
assert suggestion == ('columns', ['abc'])
assert_equals(suggestions,
[{'type': 'column',
'tables': {'schema': [None], 'table': ['abc'], 'alias': ['a']}},
{'type': 'table', 'schema': 'a'}])
def test_join_alias_dot_suggests_cols2():
suggestion = suggest_type('SELECT * FROM abc a JOIN def d ON a.',
'SELECT * FROM abc a JOIN def d ON a.id = d.')
assert suggestion == ('columns', ['def'])
assert_equals(suggestion,
[{'type': 'column',
'tables': {'schema': [None], 'table': ['def'], 'alias': ['d']}},
{'type': 'table', 'schema': 'd'}])
def test_on_suggests_aliases():
category, scope = suggest_type(
suggestions = suggest_type(
'select a.x, b.y from abc a join bcd b on ',
'select a.x, b.y from abc a join bcd b on ')
assert category == 'tables-or-aliases'
assert set(scope) == set(['a', 'b'])
assert_equals(suggestions,
[{'type': 'alias', 'aliases': ['a', 'b']}])
def test_on_suggests_tables():
category, scope = suggest_type(
suggestions = suggest_type(
'select abc.x, bcd.y from abc join bcd on ',
'select abc.x, bcd.y from abc join bcd on ')
assert category == 'tables-or-aliases'
assert set(scope) == set(['abc', 'bcd'])
assert_equals(suggestions,
[{'type': 'alias', 'aliases': ['abc', 'bcd']}])