1
0
Fork 0

Separate out sqlcompletion into a package.

This commit is contained in:
Amjith Ramanujam 2014-12-16 23:24:26 -08:00
parent 6ab0a2213f
commit f8045273e3
3 changed files with 122 additions and 42 deletions

View File

@ -2,3 +2,5 @@ if __name__ == '__main__':
import doctest
import pgcli.pgexecute
doctest.testmod(pgcli.pgexecute)
import pgcli.packages.sqlcompletion
doctest.testmod(pgcli.packages.sqlcompletion)

View File

@ -0,0 +1,95 @@
import re
import sqlparse
# Regex for finding "words" in documents. (We consider a group of alnum
# characters a word, but also a group of special characters a word, as long as
# it doesn't contain a space.)
# (This is a 'word' in Vi.)
_FIND_WORD_RE = re.compile(r'(\w+)$')
# Regex for finding "WORDS" in documents.
# (This is a 'WORD in Vi.)
_FIND_BIG_WORD_RE = re.compile(r'([^\s]+)$')
def last_word(text, include_special_chars=False):
"""
Find the last word in a sentence.
>>> last_word('abc')
'abc'
>>> last_word(' abc')
'abc'
>>> last_word('')
''
>>> last_word(' ')
''
>>> last_word('abc ')
''
>>> last_word('abc def')
'def'
>>> last_word('abc def ')
''
>>> last_word('abc def;')
''
>>> last_word('bac $def')
'def'
>>> last_word('bac $def', True)
'$def'
>>> last_word('bac \def', True)
'\\\\def'
>>> last_word('bac \def;', True)
'\\\\def;'
"""
if not text: # Empty string
return ''
if text[-1].isspace():
return ''
else:
regex = _FIND_BIG_WORD_RE if include_special_chars else _FIND_WORD_RE
result = regex.findall(text)
if result:
return result[0]
else:
return ''
def suggest_type(full_text, text_before_cursor):
"""Takes the full_text that is typed so far and also the text before the
cursor to suggest completion type and scope.
Returns a tuple with a type of entity ('table', 'column' etc) and a scope.
A scope for a column category will be the table name. Scope is set to None
if unavailable.
"""
word_before_cursor = last_word(text_before_cursor,
include_special_chars=True)
# If we've partially typed a word then word_before_cursor won't be an
# empty string. In that case we want to remove the partially typed
# string before sending it to the sqlparser. Otherwise the last token
# will always be the partially typed string which renders the smart
# completion useless because it will always return the list of keywords
# as completion.
if word_before_cursor:
parsed = sqlparse.parse(
text_before_cursor[:-len(word_before_cursor)])
else:
parsed = sqlparse.parse(text_before_cursor)
last_token = ''
if parsed:
last_token = parsed[0].token_prev(len(parsed[0].tokens))
last_token = last_token.value if last_token else ''
if last_token.lower() in ('select', 'where', 'having', 'set',
'order by', 'group by'):
return ('columns', None)
elif last_token.lower() in ('from', 'update', 'into', 'describe'):
return ('tables', None)
elif last_token in ('d',): # \d
return ('tables', None)
elif last_token.lower() in ('c', 'use'): # \c
return ('databases', None)
else:
return ('keywords', None)

View File

@ -1,6 +1,6 @@
from __future__ import print_function
from prompt_toolkit.completion import Completer, Completion
import sqlparse
from .packages.sqlcompletion import suggest_type
class PGCompleter(Completer):
keywords = ['ACCESS', 'ADD', 'ALL', 'ALTER TABLE', 'AND', 'ANY', 'AS',
@ -23,9 +23,9 @@ class PGCompleter(Completer):
special_commands = []
database_names = []
table_names = []
column_names = ['*']
databases = []
tables = []
columns = ['*']
all_completions = set(keywords)
def __init__(self, smart_completion=True):
@ -37,24 +37,24 @@ class PGCompleter(Completer):
# be at the beginning of a line.
self.special_commands.extend(special_commands)
def extend_database_names(self, database_names):
self.database_names.extend(database_names)
def extend_database_names(self, databases):
self.databases.extend(databases)
def extend_keywords(self, additional_keywords):
self.keywords.extend(additional_keywords)
self.all_completions.update(additional_keywords)
def extend_table_names(self, table_names):
self.table_names.extend(table_names)
self.all_completions.update(table_names)
def extend_table_names(self, tables):
self.tables.extend(tables)
self.all_completions.update(tables)
def extend_column_names(self, column_names):
self.column_names.extend(column_names)
self.all_completions.update(column_names)
def extend_column_names(self, columns):
self.columns.extend(columns)
self.all_completions.update(columns)
def reset_completions(self):
self.table_names = []
self.column_names = ['*']
self.tables = []
self.columns = ['*']
self.all_completions = set(self.keywords)
@staticmethod
@ -72,32 +72,15 @@ class PGCompleter(Completer):
if not self.smart_completion:
return self.find_matches(word_before_cursor, self.all_completions)
# If we've partially typed a word then word_before_cursor won't be an
# empty string. In that case we want to remove the partially typed
# string before sending it to the sqlparser. Otherwise the last token
# will always be the partially typed string which renders the smart
# completion useless because it will always return the list of keywords
# as completion.
if word_before_cursor:
parsed = sqlparse.parse(
document.text_before_cursor[:-len(word_before_cursor)])
else:
parsed = sqlparse.parse(document.text_before_cursor)
category, scope = suggest_type(document.text,
document.text_before_cursor)
last_token = ''
if parsed:
last_token = parsed[0].token_prev(len(parsed[0].tokens))
last_token = last_token.value if last_token else ''
if last_token.lower() in ('select', 'where', 'having', 'set',
'order by', 'group by'):
return self.find_matches(word_before_cursor, self.column_names)
elif last_token.lower() in ('from', 'update', 'into', 'describe'):
return self.find_matches(word_before_cursor, self.table_names)
elif last_token.lower() in ('d',): # This for the \d special command.
return self.find_matches(word_before_cursor, self.table_names)
elif last_token.lower() in ('c', 'use'): # This for the \c special command.
return self.find_matches(word_before_cursor, self.database_names)
else:
return self.find_matches(word_before_cursor,
self.keywords + self.special_commands)
if category == 'columns':
return self.find_matches(word_before_cursor, self.columns)
elif category == 'tables':
return self.find_matches(word_before_cursor, self.tables)
elif category == 'databases':
return self.find_matches(word_before_cursor, self.databases)
elif category == 'keywords':
return self.find_matches(word_before_cursor, self.keywords +
self.special_commands)