1
0
Fork 0

Make pylint happier with metadata.py

A bunch of cosmetic changes to be more compliant with PEP80 and pylint.
And I added a pylintrc so that we can ignore some rules that just add
too much clutter.
This commit is contained in:
Joakim Koljonen 2017-03-17 03:21:59 +01:00
parent 6188c86201
commit 76b6098afd
No known key found for this signature in database
GPG Key ID: AF0327B5131CD164
4 changed files with 130 additions and 103 deletions

2
pylintrc Normal file
View File

@ -0,0 +1,2 @@
[MESSAGES CONTROL]
disable=missing-docstring,invalid-name

View File

@ -1,27 +1,34 @@
from pgcli.packages.parseutils.meta import FunctionMetadata, ForeignKey
from prompt_toolkit.completion import Completion
from functools import partial
from itertools import product
from pgcli.packages.parseutils.meta import FunctionMetadata, ForeignKey
from prompt_toolkit.completion import Completion
from prompt_toolkit.document import Document
from mock import Mock
import pytest
escape = lambda name: ('"' + name + '"' if not name.islower() or name in (
'select', 'insert') else name)
parametrize = pytest.mark.parametrize
qual = ['if_more_than_one_table', 'always']
no_qual = ['if_more_than_one_table', 'never']
def escape(name):
if not name.islower() or name in ('select', 'insert'):
return '"' + name + '"'
return name
def completion(display_meta, text, pos=0):
return Completion(text, start_position=pos, display_meta=display_meta)
def get_result(completer, text, position=None):
position = len(text) if position is None else position
return completer.get_completions(
Document(text=text, cursor_position=position), Mock()
)
def result_set(completer, text, position=None):
return set(get_result(completer, text, position))
@ -30,14 +37,23 @@ def result_set(completer, text, position=None):
# def schema(text, pos=0):
# return completion('schema', text, pos)
# and so on
schema, table, view, function, column, keyword, datatype, alias, name_join,\
fk_join, join = [partial(completion, display_meta)
for display_meta in('schema', 'table', 'view', 'function', 'column',
'keyword', 'datatype', 'table alias', 'name join', 'fk join', 'join')]
schema = partial(completion, 'schema')
table = partial(completion, 'table')
view = partial(completion, 'view')
function = partial(completion, 'function')
column = partial(completion, 'column')
keyword = partial(completion, 'keyword')
datatype = partial(completion, 'datatype')
alias = partial(completion, 'table alias')
name_join = partial(completion, 'name join')
fk_join = partial(completion, 'fk join')
join = partial(completion, 'join')
def wildcard_expansion(cols, pos=-1):
return Completion(cols, start_position=pos, display_meta='columns',
display = '*')
return Completion(
cols, start_position=pos, display_meta='columns', display='*')
class MetaData(object):
def __init__(self, metadata):
@ -52,97 +68,103 @@ class MetaData(object):
def keywords(self, pos=0):
return [keyword(kw, pos) for kw in self.completer.keywords]
def columns(self, parent, schema='public', typ='tables', pos=0):
def columns(self, tbl, parent='public', typ='tables', pos=0):
if typ == 'functions':
fun = [x for x in self.metadata[typ][schema] if x[0] == parent][0]
fun = [x for x in self.metadata[typ][parent] if x[0] == tbl][0]
cols = fun[1]
else:
cols = self.metadata[typ][schema][parent]
cols = self.metadata[typ][parent][tbl]
return [column(escape(col), pos) for col in cols]
def datatypes(self, schema='public', pos=0):
return [datatype(escape(x), pos)
for x in self.metadata.get('datatypes', {}).get(schema, [])]
def datatypes(self, parent='public', pos=0):
return [
datatype(escape(x), pos)
for x in self.metadata.get('datatypes', {}).get(parent, [])]
def tables(self, schema='public', pos=0):
return [table(escape(x), pos)
for x in self.metadata.get('tables', {}).get(schema, [])]
def tables(self, parent='public', pos=0):
return [
table(escape(x), pos)
for x in self.metadata.get('tables', {}).get(parent, [])]
def views(self, schema='public', pos=0):
return [view(escape(x), pos)
for x in self.metadata.get('views', {}).get(schema, [])]
def views(self, parent='public', pos=0):
return [
view(escape(x), pos)
for x in self.metadata.get('views', {}).get(parent, [])]
def functions(self, schema='public', pos=0):
return [function(escape(x[0] + '()'), pos)
for x in self.metadata.get('functions', {}).get(schema, [])]
def functions(self, parent='public', pos=0):
return [
function(escape(x[0] + '()'), pos)
for x in self.metadata.get('functions', {}).get(parent, [])]
def schemas(self, pos=0):
schemas = set(sch for schs in self.metadata.values() for sch in schs)
return [schema(escape(s), pos=pos) for s in schemas]
def functions_and_keywords(self, schema='public', pos=0):
def functions_and_keywords(self, parent='public', pos=0):
return (
self.functions(schema, pos) + self.builtin_functions(pos) +
self.functions(parent, pos) + self.builtin_functions(pos) +
self.keywords(pos)
)
# Note that the filtering parameters here only apply to the columns
def columns_functions_and_keywords(self, parent, schema='public', typ='tables', pos=0):
def columns_functions_and_keywords(
self, tbl, parent='public', typ='tables', pos=0
):
return (
self.functions_and_keywords(pos=pos) +
self.columns(parent, schema, typ, pos)
self.columns(tbl, parent, typ, pos)
)
def from_clause_items(self, schema='public', pos=0):
def from_clause_items(self, parent='public', pos=0):
return (
self.functions(schema, pos) + self.views(schema, pos) +
self.tables(schema, pos)
self.functions(parent, pos) + self.views(parent, pos) +
self.tables(parent, pos)
)
def schemas_and_from_clause_items(self, schema='public', pos=0):
return self.from_clause_items(schema, pos) + self.schemas(pos)
def schemas_and_from_clause_items(self, parent='public', pos=0):
return self.from_clause_items(parent, pos) + self.schemas(pos)
def types(self, schema='public', pos=0):
return self.datatypes(schema, pos) + self.tables(schema, pos)
def types(self, parent='public', pos=0):
return self.datatypes(parent, pos) + self.tables(parent, pos)
@property
def completer(self):
return self.get_completer()
def get_completers(self, casing):
'''
Returns a function taking three bools `casing`, `filtr`, `alias` and
"""
Returns a function taking three bools `casing`, `filtr`, `aliasing` and
the list `qualify`, all defaulting to None.
Returns a list of completers.
These parameters specify the allowed values for the corresponding
completer parameters, `None` meaning any, i.e. (None, None, None, None)
results in all 24 possible completers, whereas e.g.
(True, False, True, ['never']) results in the one completer with casing,
without `search_path` filtering of objects, with table aliasing, and
without column qualification.
'''
def _cfg(_casing, filtr, alias, qualify):
cfg = {'settings':{}}
(True, False, True, ['never']) results in the one completer with
casing, without `search_path` filtering of objects, with table
aliasing, and without column qualification.
"""
def _cfg(_casing, filtr, aliasing, qualify):
cfg = {'settings': {}}
if _casing:
cfg['casing'] = casing
cfg['settings']['search_path_filter'] = filtr
cfg['settings']['generate_aliases'] = alias
cfg['settings']['generate_aliases'] = aliasing
cfg['settings']['qualify_columns'] = qualify
return cfg
def _cfgs(casing, filtr, alias, qualify):
def _cfgs(casing, filtr, aliasing, qualify):
casings = [True, False] if casing is None else [casing]
filtrs = [True, False] if filtr is None else [filtr]
aliases = [True, False] if alias is None else [alias]
aliases = [True, False] if aliasing is None else [aliasing]
qualifys = qualify or ['always', 'if_more_than_one_table', 'never']
return [
_cfg(*p) for p in product(casings, filtrs, aliases, qualifys)
]
def completers(casing=None, filtr=None, alias=None, qualify=None):
def completers(casing=None, filtr=None, aliasing=None, qualify=None):
get_comp = self.get_completer
return [
get_comp(**c) for c in _cfgs(casing, filtr, alias, qualify)
get_comp(**c) for c in _cfgs(casing, filtr, aliasing, qualify)
]
return completers
@ -154,29 +176,32 @@ class MetaData(object):
schemata, tables, tbl_cols, views, view_cols = [], [], [], [], []
for schema, tbls in metadata['tables'].items():
schemata.append(schema)
for sch, tbls in metadata['tables'].items():
schemata.append(sch)
for table, cols in tbls.items():
tables.append((schema, table))
for tbl, cols in tbls.items():
tables.append((sch, tbl))
# Let all columns be text columns
tbl_cols.extend([(schema, table, col, 'text') for col in cols])
tbl_cols.extend([(sch, tbl, col, 'text') for col in cols])
for schema, tbls in metadata.get('views', {}).items():
for view, cols in tbls.items():
views.append((schema, view))
for sch, tbls in metadata.get('views', {}).items():
for tbl, cols in tbls.items():
views.append((sch, tbl))
# Let all columns be text columns
view_cols.extend([(schema, view, col, 'text') for col in cols])
view_cols.extend([(sch, tbl, col, 'text') for col in cols])
functions = [FunctionMetadata(schema, *func_meta)
for schema, funcs in metadata['functions'].items()
for func_meta in funcs]
functions = [
FunctionMetadata(sch, *func_meta)
for sch, funcs in metadata['functions'].items()
for func_meta in funcs]
datatypes = [(schema, datatype)
for schema, datatypes in metadata['datatypes'].items()
for datatype in datatypes]
datatypes = [
(sch, typ)
for sch, datatypes in metadata['datatypes'].items()
for typ in datatypes]
foreignkeys = [ForeignKey(*fk) for fks in metadata['foreignkeys'].values()
foreignkeys = [
ForeignKey(*fk) for fks in metadata['foreignkeys'].values()
for fk in fks]
comp.extend_schemata(schemata)

View File

@ -110,7 +110,7 @@ def test_suggested_join_conditions(completer, text):
fk_join('shipments.user_id = users.id')])
@parametrize('completer', completers(filtr=True, casing=False, alias=False))
@parametrize('completer', completers(filtr=True, casing=False, aliasing=False))
@parametrize(('query', 'tbl'), itertools.product((
'SELECT * FROM public.{0} RIGHT OUTER JOIN ',
'''SELECT *
@ -143,7 +143,7 @@ def test_suggested_column_names_in_function(completer):
assert result == set(testdata.columns('products', 'custom'))
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', [
'SELECT * FROM Custom.',
'SELECT * FROM custom.',
@ -163,7 +163,7 @@ def test_suggested_table_names_with_schema_dot(
assert result == set(testdata.from_clause_items('custom', start_position))
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', [
'SELECT * FROM "Custom".',
])
@ -232,7 +232,7 @@ def test_suggested_aliases_after_on_right_side(completer):
assert result == set([alias('x'), alias('y')])
@parametrize('completer', completers(filtr=True, casing=False, alias=False))
@parametrize('completer', completers(filtr=True, casing=False, aliasing=False))
def test_table_names_after_from(completer):
text = 'SELECT * FROM '
result = result_set(completer, text)
@ -406,14 +406,14 @@ def test_suggest_columns_from_quoted_table(completer, text):
texts = ['SELECT * FROM ', 'SELECT * FROM public.Orders O CROSS JOIN ']
@parametrize('completer', completers(filtr=True, casing=False, alias=False))
@parametrize('completer', completers(filtr=True, casing=False, aliasing=False))
@parametrize('text', texts)
def test_schema_or_visible_table_completion(completer, text):
result = result_set(completer, text)
assert result == set(testdata.schemas_and_from_clause_items())
@parametrize('completer', completers(alias=True, casing=False, filtr=True))
@parametrize('completer', completers(aliasing=True, casing=False, filtr=True))
@parametrize('text', texts)
def test_table_aliases(completer, text):
result = result_set(completer, text)
@ -425,7 +425,7 @@ def test_table_aliases(completer, text):
function('func2() f')])
@parametrize('completer', completers(alias=True, casing=True, filtr=True))
@parametrize('completer', completers(aliasing=True, casing=True, filtr=True))
@parametrize('text', texts)
def test_aliases_with_casing(completer, text):
result = result_set(completer, text)
@ -437,7 +437,7 @@ def test_aliases_with_casing(completer, text):
function('func2() f')])
@parametrize('completer', completers(alias=False, casing=True, filtr=True))
@parametrize('completer', completers(aliasing=False, casing=True, filtr=True))
@parametrize('text', texts)
def test_table_casing(completer, text):
result = result_set(completer, text)
@ -449,35 +449,35 @@ def test_table_casing(completer, text):
function('func2()')])
@parametrize('completer', completers(alias=False, casing=True))
@parametrize('completer', completers(aliasing=False, casing=True))
def test_alias_search_without_aliases2(completer):
text = 'SELECT * FROM blog.et'
result = get_result(completer, text)
assert result[0] == table('EntryTags', -2)
@parametrize('completer', completers(alias=False, casing=True))
@parametrize('completer', completers(aliasing=False, casing=True))
def test_alias_search_without_aliases1(completer):
text = 'SELECT * FROM blog.e'
result = get_result(completer, text)
assert result[0] == table('Entries', -1)
@parametrize('completer', completers(alias=True, casing=True))
@parametrize('completer', completers(aliasing=True, casing=True))
def test_alias_search_with_aliases2(completer):
text = 'SELECT * FROM blog.et'
result = get_result(completer, text)
assert result[0] == table('EntryTags ET', -2)
@parametrize('completer', completers(alias=True, casing=True))
@parametrize('completer', completers(aliasing=True, casing=True))
def test_alias_search_with_aliases1(completer):
text = 'SELECT * FROM blog.e'
result = get_result(completer, text)
assert result[0] == table('Entries E', -1)
@parametrize('completer', completers(alias=True, casing=True))
@parametrize('completer', completers(aliasing=True, casing=True))
def test_join_alias_search_with_aliases1(completer):
text = 'SELECT * FROM blog.Entries E JOIN blog.e'
result = get_result(completer, text)
@ -485,7 +485,7 @@ def test_join_alias_search_with_aliases1(completer):
'EntAccLog EAL ON EAL.EntryID = E.EntryID', -1)]
@parametrize('completer', completers(alias=False, casing=True))
@parametrize('completer', completers(aliasing=False, casing=True))
def test_join_alias_search_without_aliases1(completer):
text = 'SELECT * FROM blog.Entries JOIN blog.e'
result = get_result(completer, text)
@ -493,14 +493,14 @@ def test_join_alias_search_without_aliases1(completer):
'EntAccLog ON EntAccLog.EntryID = Entries.EntryID', -1)]
@parametrize('completer', completers(alias=True, casing=True))
@parametrize('completer', completers(aliasing=True, casing=True))
def test_join_alias_search_with_aliases2(completer):
text = 'SELECT * FROM blog.Entries E JOIN blog.et'
result = get_result(completer, text)
assert result[0] == join('EntryTags ET ON ET.EntryID = E.EntryID', -2)
@parametrize('completer', completers(alias=False, casing=True))
@parametrize('completer', completers(aliasing=False, casing=True))
def test_join_alias_search_without_aliases2(completer):
text = 'SELECT * FROM blog.Entries JOIN blog.et'
result = get_result(completer, text)
@ -540,14 +540,14 @@ def test_column_alias_search_qualified(completer):
assert result[:3] == [column(c, -2) for c in cols]
@parametrize('completer', completers(casing=False, filtr=False, alias=False))
@parametrize('completer', completers(casing=False, filtr=False, aliasing=False))
def test_schema_object_order(completer):
result = get_result(completer, 'SELECT * FROM u')
assert result[:3] == [
table(t, pos=-1) for t in ('users', 'custom."Users"', 'custom.users')
]
@parametrize('completer', completers(casing=False, filtr=False, alias=False))
@parametrize('completer', completers(casing=False, filtr=False, aliasing=False))
def test_all_schema_objects(completer):
text = ('SELECT * FROM ')
result = result_set(completer, text)
@ -557,7 +557,7 @@ def test_all_schema_objects(completer):
)
@parametrize('completer', completers(filtr=False, alias=False, casing=True))
@parametrize('completer', completers(filtr=False, aliasing=False, casing=True))
def test_all_schema_objects_with_casing(completer):
text = 'SELECT * FROM '
result = result_set(completer, text)
@ -567,7 +567,7 @@ def test_all_schema_objects_with_casing(completer):
)
@parametrize('completer', completers(casing=False, filtr=False, alias=True))
@parametrize('completer', completers(casing=False, filtr=False, aliasing=True))
def test_all_schema_objects_with_aliases(completer):
text = ('SELECT * FROM ')
result = result_set(completer, text)

View File

@ -84,7 +84,7 @@ def test_builtin_function_matches_only_at_start(completer):
assert 'MIN' not in result
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
def test_user_function_name_completion(completer):
result = result_set(completer, 'SELECT cu')
assert result == set([
@ -96,7 +96,7 @@ def test_user_function_name_completion(completer):
])
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
def test_user_function_name_completion_matches_anywhere(completer):
result = result_set(completer, 'SELECT om')
assert result == set([
@ -312,7 +312,7 @@ def test_suggested_join_conditions_with_invalid_table(completer, text, ref):
assert result == set([alias('users'), alias(ref)])
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', [
'SELECT * FROM "Users" u JOIN u',
'SELECT * FROM "Users" u JOIN uid',
@ -341,7 +341,7 @@ join_texts = [
]
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', join_texts)
def test_suggested_joins(completer, text):
result = result_set(completer, text)
@ -354,7 +354,7 @@ def test_suggested_joins(completer, text):
)
@parametrize('completer', completers(casing=True, alias=False))
@parametrize('completer', completers(casing=True, aliasing=False))
@parametrize('text', join_texts)
def test_cased_joins(completer, text):
result = result_set(completer, text)
@ -365,7 +365,7 @@ def test_cased_joins(completer, text):
])
@parametrize('completer', completers(casing=False, alias=True))
@parametrize('completer', completers(casing=False, aliasing=True))
@parametrize('text', join_texts)
def test_aliased_joins(completer, text):
result = result_set(completer, text)
@ -376,7 +376,7 @@ def test_aliased_joins(completer, text):
])
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', [
'SELECT * FROM public."Users" JOIN ',
'SELECT * FROM public."Users" RIGHT OUTER JOIN ',
@ -480,7 +480,7 @@ def test_join_using_suggests_columns_after_first_column(completer, text):
assert result == set([column('id'), column('email')])
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', [
'SELECT * FROM ',
'SELECT * FROM users CROSS JOIN ',
@ -510,7 +510,7 @@ def test_auto_escaped_col_names(completer):
assert result == set(testdata.columns_functions_and_keywords('select'))
@parametrize('completer', completers(alias=False))
@parametrize('completer', completers(aliasing=False))
def test_allow_leading_double_quote_in_last_word(completer):
result = result_set(completer, 'SELECT * from "sele')
@ -592,7 +592,7 @@ def test_learn_keywords(completer):
assert completions[0].text == 'VIEW'
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
def test_learn_table_names(completer):
history = 'SELECT * FROM users; SELECT * FROM orders; SELECT * FROM users'
completer.extend_query_history(history)
@ -721,7 +721,7 @@ def test_suggest_columns_from_quoted_table(completer):
assert result == set(testdata.columns('Users'))
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', ['SELECT * FROM ',
'SELECT * FROM Orders o CROSS JOIN '])
def test_schema_or_visible_table_completion(completer, text):
@ -729,14 +729,14 @@ def test_schema_or_visible_table_completion(completer, text):
assert result == set(testdata.schemas_and_from_clause_items())
@parametrize('completer', completers(casing=False, alias=True))
@parametrize('completer', completers(casing=False, aliasing=True))
@parametrize('text', ['SELECT * FROM '])
def test_table_aliases(completer, text):
result = result_set(completer, text)
assert result == set(testdata.schemas() + aliased_rels)
@parametrize('completer', completers(casing=False, alias=True))
@parametrize('completer', completers(casing=False, aliasing=True))
@parametrize('text', ['SELECT * FROM Orders o CROSS JOIN '])
def test_duplicate_table_aliases(completer, text):
result = result_set(completer, text)
@ -753,7 +753,7 @@ def test_duplicate_table_aliases(completer, text):
function('set_returning_func() srf')])
@parametrize('completer', completers(casing=True, alias=True))
@parametrize('completer', completers(casing=True, aliasing=True))
@parametrize('text', ['SELECT * FROM Orders o CROSS JOIN '])
def test_duplicate_aliases_with_casing(completer, text):
result = result_set(completer, text)
@ -771,14 +771,14 @@ def test_duplicate_aliases_with_casing(completer, text):
function('set_returning_func() srf')])
@parametrize('completer', completers(casing=True, alias=True))
@parametrize('completer', completers(casing=True, aliasing=True))
@parametrize('text', ['SELECT * FROM '])
def test_aliases_with_casing(completer, text):
result = result_set(completer, text)
assert result == set([schema('PUBLIC')] + cased_aliased_rels)
@parametrize('completer', completers(casing=True, alias=False))
@parametrize('completer', completers(casing=True, aliasing=False))
@parametrize('text', ['SELECT * FROM '])
def test_table_casing(completer, text):
result = result_set(completer, text)
@ -798,7 +798,7 @@ def test_insert(completer, text):
assert result == set(testdata.columns('users'))
@parametrize('completer', completers(casing=False, alias=False))
@parametrize('completer', completers(casing=False, aliasing=False))
def test_suggest_cte_names(completer):
text = '''
WITH cte1 AS (SELECT a, b, c FROM foo),