mirror of
https://github.com/dbcli/pgcli
synced 2024-06-16 01:42:23 +00:00
96 lines
2.6 KiB
Python
96 lines
2.6 KiB
Python
import re
|
|
import sqlparse
|
|
from sqlparse.sql import IdentifierList, Identifier
|
|
from sqlparse.tokens import Keyword, DML
|
|
|
|
# This matches only alphanumerics and underscores.
|
|
_LAST_WORD_RE = re.compile(r'(\w+)$')
|
|
# This matches everything except a space.
|
|
_LAST_WORD_SPL_RE = re.compile(r'([^\s]+)$')
|
|
|
|
def last_word(text, include_special_chars=False):
|
|
"""
|
|
Find the last word in a sentence.
|
|
|
|
>>> last_word('abc')
|
|
'abc'
|
|
>>> last_word(' abc')
|
|
'abc'
|
|
>>> last_word('')
|
|
''
|
|
>>> last_word(' ')
|
|
''
|
|
>>> last_word('abc ')
|
|
''
|
|
>>> last_word('abc def')
|
|
'def'
|
|
>>> last_word('abc def ')
|
|
''
|
|
>>> last_word('abc def;')
|
|
''
|
|
>>> last_word('bac $def')
|
|
'def'
|
|
>>> last_word('bac $def', True)
|
|
'$def'
|
|
>>> last_word('bac \def', True)
|
|
'\\\\def'
|
|
>>> last_word('bac \def;', True)
|
|
'\\\\def;'
|
|
"""
|
|
|
|
if not text: # Empty string
|
|
return ''
|
|
|
|
if text[-1].isspace():
|
|
return ''
|
|
else:
|
|
regex = _LAST_WORD_SPL_RE if include_special_chars else _LAST_WORD_RE
|
|
result = regex.findall(text)
|
|
if result:
|
|
return result[0]
|
|
else:
|
|
return ''
|
|
|
|
|
|
# This code is borrowed from sqlparse example script.
|
|
# <url>
|
|
def is_subselect(parsed):
|
|
if not parsed.is_group():
|
|
return False
|
|
for item in parsed.tokens:
|
|
if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT',
|
|
'UPDATE', 'CREATE', 'DELETE'):
|
|
return True
|
|
return False
|
|
|
|
def extract_from_part(parsed):
|
|
from_seen = False
|
|
for item in parsed.tokens:
|
|
if from_seen:
|
|
if is_subselect(item):
|
|
for x in extract_from_part(item):
|
|
yield x
|
|
elif item.ttype is Keyword:
|
|
raise StopIteration
|
|
else:
|
|
yield item
|
|
elif item.ttype is Keyword and item.value.upper() in ('FROM', 'INTO',
|
|
'UPDATE', 'TABLE', ):
|
|
from_seen = True
|
|
|
|
def extract_table_identifiers(token_stream):
|
|
for item in token_stream:
|
|
if isinstance(item, IdentifierList):
|
|
for identifier in item.get_identifiers():
|
|
yield identifier.get_name()
|
|
elif isinstance(item, Identifier):
|
|
yield item.get_name()
|
|
# It's a bug to check for Keyword here, but in the example
|
|
# above some tables names are identified as keywords...
|
|
elif item.ttype is Keyword:
|
|
yield item.value
|
|
|
|
def extract_tables(sql):
|
|
stream = extract_from_part(sqlparse.parse(sql)[0])
|
|
return list(extract_table_identifiers(stream))
|