1
0
Fork 0

Merge remote-tracking branch 'upstream/master' into feature/tox_behave

This commit is contained in:
Dick Marinus 2017-05-22 20:45:55 +02:00
commit c6ae43a398
46 changed files with 2232 additions and 1712 deletions

3
.coveragerc Normal file
View File

@ -0,0 +1,3 @@
[run]
parallel=True
source=pgcli

9
.github/ISSUE_TEMPLATE.md vendored Normal file
View File

@ -0,0 +1,9 @@
## Description
<!--- Describe your problem as fully as you can. -->
## Your environment
<!-- This gives us some more context to work with. -->
- [ ] Please provide your OS and version information.
- [ ] Please provide your CLI version.
- [ ] What is the output of ``pip freeze`` command.

9
.github/PULL_REQUEST_TEMPLATE.md vendored Normal file
View File

@ -0,0 +1,9 @@
## Description
<!--- Describe your changes in detail. -->
## Checklist
<!--- We appreciate your help and want to give you credit. Please take a moment to put an `x` in the boxes below as you complete them. -->
- [ ] I've added this contribution to the `changelog.md`.
- [ ] I've added my name to the `AUTHORS` file (or it's already there).

View File

@ -7,14 +7,23 @@ python:
- "3.6"
install:
- pip install . pytest mock codecov==1.5.1 behave pexpect==3.3
- pip install . docutils pytest mock codecov==1.5.1 behave pexpect==3.3
- pip install git+https://github.com/hayd/pep8radius.git
script:
- set -e
- coverage run --source pgcli -m py.test
- cd tests
- behave
- cd ..
# check for changelog ReST compliance
- rst2html.py --halt=warning changelog.rst >/dev/null
# check for pep8 errors, only looking at branch vs master. If there are errors, show diff and return an error code.
- pep8radius master --docformatter --error-status || ( pep8radius master --docformatter --diff; false )
- set +e
after_success:
- coverage combine
- codecov
notifications:

View File

@ -12,10 +12,12 @@ Core Devs:
* Joakim Koljonen
* Daniel Rocco
* Karl-Aksel Puulmann
* Dick Marinus
Contributors:
-------------
* Brett
* Étienne BERSAC (bersace)
* Daniel Schwarz
* inkn
* Jonathan Slenders
@ -56,6 +58,12 @@ Contributors:
* Manuel Barkhau
* Sergii V
* Emanuele Gaifas
* Owen Stephens
* Russell Davies
* AlexTes
* Hraban Luyat
* Jackson Popkin
* Gustavo Castro
Creator:
--------

View File

@ -66,24 +66,8 @@ Adding PostgreSQL Special (Meta) Commands
-----------------------------------------
If you want to work on adding new meta-commands (such as `\dp`, `\ds`, `dy`),
you'll be changing the code of `packages/pgspecial.py`. Search for the
dictionary called `CASE_SENSITIVE_COMMANDS`. The special command us used as
the dictionary key, and the value is a tuple.
The first item in the tuple is either a string (sql statement) or a function.
The second item in the tuple is a list of strings which is the documentation
for that special command. The list will have two items, the first item is the
command itself with possible options and the second item is the plain english
description of that command.
For example, `\l` is a meta-command that lists all the databases. The way you
can see the SQL statement issued by PostgreSQL when this command is executed
is to launch `psql -E` and entering `\l`.
That will print the results and also print the sql statement that was executed
to produce that result. In most cases it's a single sql statement, but sometimes
it's a series of sql statements that feed the results to each other to get to
the final result.
you need to contribute to `pgspecial <https://github.com/pgcli/pgspecial/>`_
project.
Building RPM and DEB packages
-----------------------------
@ -155,3 +139,18 @@ To see stdout/stderr, use the following command:
$ behave --no-capture
PEP8 checks
-----------
When you submit a PR, the changeset is checked for pep8 compliance using
`pep8radius <https://github.com/hayd/pep8radius>`_. If you see a build failing because
of these checks, install pep8radius and apply style fixes:
::
$ pip install pep8radius
$ pep8radius --docformatter --diff # view a diff of proposed fixes
$ pep8radius --docformatter --in-place # apply the fixes
Then commit and push the fixes.

6
Dockerfile Normal file
View File

@ -0,0 +1,6 @@
FROM python:2.7
COPY . /app
RUN cd /app && pip install -e .
CMD pgcli

View File

@ -63,7 +63,6 @@ The `pgcli` is written using prompt_toolkit_.
- ``SELECT * FROM <tab>`` will only show table names.
- ``SELECT * FROM users WHERE <tab>`` will only show column names.
* Config file is automatically created at ``~/.config/pgcli/config`` at first launch.
* Primitive support for ``psql`` back-slash commands.
* Pretty prints tabular data.
Note: `pgcli` uses [tabulate](https://github.com/dbcli/pgcli/blob/master/pgcli/packages/tabulate.py)
@ -73,6 +72,11 @@ The `pgcli` is written using prompt_toolkit_.
.. _prompt_toolkit: https://github.com/jonathanslenders/python-prompt-toolkit
Config
------
A config file is automatically created at ``~/.config/pgcli/config`` at first launch.
See the file itself for a description of all available options.
Contributions:
--------------
@ -176,6 +180,39 @@ Then you can install pgcli:
$ sudo pip install pgcli
Docker
======
Pgcli can be run from within Docker. This can be useful to try pgcli without
installing it, or any dependencies, system-wide.
To build the image:
::
$ docker build -t pgcli .
To create a container from the image:
::
$ docker run --rm -ti pgcli pgcli <ARGS>
To access postgresql databases listening on localhost, make sure to run the
docker in "host net mode". E.g. to access a database called "foo" on the
postgresql server running on localhost:5432 (the standard port):
::
$ docker run --rm -ti --net host pgcli pgcli -h localhost foo
To connect to a locally running instance over a unix socket, bind the socket to
the docker container:
::
$ docker run --rm -ti -v /var/run/postgres:/var/run/postgres pgcli pgcli foo
Thanks:
-------

View File

@ -1,8 +1,33 @@
Upcoming
========
Features:
---------
* Add time option for prompt (Thanks: `Gustavo Castro`_)
* Suggest objects from all schemas (not just those in search_path) (Thanks: `Joakim Koljonen`_)
* Casing for column headers (Thanks: `Joakim Koljonen`_)
* Allow configurable character to be used for multi-line query continuations. (Thanks: `Owen Stephens`_)
* Completions after ORDER BY and DISTINCT now take account of table aliases. (Thanks: `Owen Stephens`_)
* Narrow keyword candidates based on previous keyword. (Thanks: `Étienne Bersac`_)
* Opening an external editor will edit the last-run query. (Thanks: `Thomas Roten`_)
Bug fixes:
----------
* Fixed external editor bug (issue #668). (Thanks: `Irina Truong`_).
* Standardize command line option names. (Thanks: `Russell Davies`_)
* Improve handling of ``lock_not_available`` error (issue #700). (Thanks: `Jackson Popkin <https://github.com/jdpopkin>`_)
* Fixed user option precedence (issue #697). (Thanks: `Irina Truong`_).
Internal changes:
-----------------
* Run pep8 checks in travis (Thanks: `Irina Truong`_).
* Add pager wrapper for behave tests (Thanks: `Dick Marinus`_).
* Behave quit pgcli nicely (Thanks: `Dick Marinus`_).
* Behave test source command (Thanks: `Dick Marinus`_).
* Behave fix clean up. (Thanks: `Dick Marinus`_).
* Test using behave the tee command (Thanks: `Dick Marinus`_).
* Behave remove boiler plate code (Thanks: `Dick Marinus`_).
* Behave fix pgspecial update (Thanks: `Dick Marinus`_).
* Add behave to tox (Thanks: `Dick Marinus`_).
1.5.1
@ -236,12 +261,12 @@ Features:
than the display window.
* Don't hide functions from pg_catalog. (Thanks: `Darik Gamble`_).
* Suggest set-returning functions as tables. (Thanks: `Darik Gamble`_).
Functions that return table like results will now be suggested in places of tables.
Functions that return table like results will now be suggested in places of tables.
* Suggest fields from functions used as tables. (Thanks: `Darik Gamble`_).
* Using ``pgspecial`` as a separate module. (Thanks: `Iryna Cherniavska`_).
* Make "enter" key behave as "tab" key when the completion menu is displayed. (Thanks: `Matheus Rosa`_).
* Support different error-handling options when running multiple queries. (Thanks: `Darik Gamble`_).
When ``on_error = STOP`` in the config file, pgcli will abort execution if one of the queries results in an error.
When ``on_error = STOP`` in the config file, pgcli will abort execution if one of the queries results in an error.
* Hide the password displayed in the process name in ``ps``. (Thanks: `Stuart Quin`_)
Bug Fixes:
@ -314,7 +339,7 @@ Features:
* Completion menu now has metadata information such as schema, table, column, view, etc., next to the suggestions. (Thanks: `Darik Gamble`_)
* Customizable history file location via config file. (Thanks: `Çağatay Yüksel`_)
Add this line to your config file (~/.pgclirc) to customize where to store the history file.
Add this line to your config file (~/.pgclirc) to customize where to store the history file.
::
@ -345,16 +370,16 @@ Internal Changes:
Features:
---------
* Add fuzzy matching for the table names and column names.
* Add fuzzy matching for the table names and column names.
Matching very long table/column names are now easier with fuzzy matching. The
fuzzy match works like the fuzzy open in SublimeText or Vim's Ctrl-P plugin.
fuzzy match works like the fuzzy open in SublimeText or Vim's Ctrl-P plugin.
eg: Typing ``djmv`` will match `django_migration_views` since it is able to
match parts of the input to the full table name.
* Change the timing information to seconds.
* Change the timing information to seconds.
The ``Command Time`` and ``Format Time`` are now displayed in seconds instead
of a unitless number displayed in scientific notation.
@ -363,7 +388,7 @@ Features:
Frequently typed queries can now be saved and recalled using a name using
newly added special commands (``\n[+]``, ``\ns``, ``\nd``).
eg:
eg:
::
@ -394,17 +419,17 @@ Bug Fixes:
* Fix an error when running ``\d table_name`` when running on a table with rules. (Thanks: `Ali Kargın`_)
* Fix a pgcli crash when entering non-ascii characters in Windows. (Thanks: `Darik Gamble`_, `Jonathan Slenders`_)
* Faster rendering of expanded mode output by making the horizontal separator a fixed length string.
* Completion suggestions for the ``\c`` command are not auto-escaped by default.
* Faster rendering of expanded mode output by making the horizontal separator a fixed length string.
* Completion suggestions for the ``\c`` command are not auto-escaped by default.
Internal Changes:
-----------------
* Complete refactor of handling the back-slash commands.
* Complete refactor of handling the back-slash commands.
* Upgrade prompt_toolkit to 0.42. (Thanks: `Jonathan Slenders`_)
* Change the config file management to use ConfigObj.(Thanks: `Brett Atoms`_)
* Add integration tests using ``behave``. (Thanks: `Iryna Cherniavska`_)
0.17.0
======
@ -417,16 +442,16 @@ Features:
Previously completions only matched a table name if it started with the
partially typed word. Now completions will match even if the partially typed
word is in the middle of a suggestion.
eg: When you type 'mig', 'django_migrations' will be suggested.
eg: When you type 'mig', 'django_migrations' will be suggested.
* Completion for built-in tables and temporary tables are suggested after entering a prefix of ``pg_``. (Thanks: `Darik Gamble`_)
* Add place holder doc strings for special commands that are planned for implementation. (Thanks: `Iryna Cherniavska`_)
* Updated version of prompt_toolkit, now matching braces are highlighted. (Thanks: `Jonathan Slenders`_)
* Added support of ``\\e`` command. Queries can be edited in an external editor. (Thanks: `Iryna Cherniavska`_)
eg: When you type ``SELECT * FROM \e`` it will be opened in an external editor.
* Add special command ``\dT`` to show datatypes. (Thanks: `Darik Gamble`_)
* Add auto-completion support for datatypes in CREATE, SELECT etc. (Thanks: `Darik Gamble`_)
* Add auto-completion support for datatypes in CREATE, SELECT etc. (Thanks: `Darik Gamble`_)
* Improve the auto-completion in WHERE clause with logical operators. (Thanks: `Darik Gamble`_)
*
*
Bug Fixes:
----------
@ -458,7 +483,7 @@ Bug Fixes:
As a result the suggestions for tables vs functions are cleaner. (Thanks: `Darik Gamble`_)
* Remove scientific notation when formatting large numbers. (Thanks: `Daniel Rocco`_)
* Add the FUNCTION keyword to auto-completion.
* Display NULL values as <null> instead of empty strings.
* Display NULL values as <null> instead of empty strings.
* Fix the completion refresh when ``\connect`` is executed.
0.16.1
@ -474,10 +499,10 @@ Bug Fixes:
Features:
---------
* Add \ds special command to show sequences.
* Add \ds special command to show sequences.
* Add Vi mode for keybindings. This can be enabled by adding 'vi = True' in ~/.pgclirc. (Thanks: `Jay Zeng`_)
* Add a -v/--version flag to pgcli.
* Add completion for TEMPLATE keyword and smart-completion for
* Add completion for TEMPLATE keyword and smart-completion for
'CREATE DATABASE blah WITH TEMPLATE <tab>'. (Thanks: `Daniel Rocco`_)
* Add custom decoders to json/jsonb to emulate the behavior of psql. This
removes the unicode prefix (eg: u'Éowyn') in the output. (Thanks: `Daniel Rocco`_)
@ -492,7 +517,7 @@ Bug Fixes:
* Print BIGSERIAL type as Integer instead of Float.
* Show completions for special commands at the beginning of a statement. (Thanks: `Daniel Rocco`_)
* Allow special commands to work in a multi-statement case where multiple sql
statements are separated by semi-colon in the same line.
statements are separated by semi-colon in the same line.
0.15.4
======
@ -500,11 +525,11 @@ Bug Fixes:
0.15.3
======
* Override the LESS options completely instead of appending to it.
* Override the LESS options completely instead of appending to it.
0.15.2
======
* Revert back to using psycopg2 as the postgres adapter. psycopg2cffi fails for some tests in Python 3.
* Revert back to using psycopg2 as the postgres adapter. psycopg2cffi fails for some tests in Python 3.
0.15.0
======
@ -513,14 +538,14 @@ Features:
---------
* Add syntax color styles to config.
* Add auto-completion for COPY statements.
* Change Postgres adapter to psycopg2cffi, to make it PyPy compatible.
* Change Postgres adapter to psycopg2cffi, to make it PyPy compatible.
Now pgcli can be run by PyPy.
Bug Fixes:
----------
* Treat boolean values as strings instead of ints.
* Make \di, \dv and \dt to be schema aware. (Thanks: `Darik Gamble`_)
* Make column name display unicode compatible.
* Make column name display unicode compatible.
0.14.0
======
@ -528,14 +553,14 @@ Bug Fixes:
Features:
---------
* Add alias completion support to ON keyword. (Thanks: `Iryna Cherniavska`_)
* Add LIMIT keyword to completion.
* Add LIMIT keyword to completion.
* Auto-completion for Postgres schemas. (Thanks: `Darik Gamble`_)
* Better unicode handling for datatypes, dbname and roles.
* Add \timing command to time the sql commands.
* Better unicode handling for datatypes, dbname and roles.
* Add \timing command to time the sql commands.
This can be set via config file (~/.pgclirc) using `timing = True`.
* Add different table styles for displaying output.
* Add different table styles for displaying output.
This can be changed via config file (~/.pgclirc) using `table_format = fancy_grid`.
* Add confirmation before printing results that have more than 1000 rows.
* Add confirmation before printing results that have more than 1000 rows.
Bug Fixes:
----------
@ -552,7 +577,7 @@ Bug Fixes:
Features:
---------
* Add -d/--dbname option to the commandline.
* Add -d/--dbname option to the commandline.
eg: pgcli -d database
* Add the username as an argument after the database.
eg: pgcli dbname user
@ -569,12 +594,12 @@ Bug Fixes:
Features:
---------
* Upgrade to prompt_toolkit version 0.26 (Thanks: https://github.com/macobo)
* Upgrade to prompt_toolkit version 0.26 (Thanks: https://github.com/macobo)
* Adds Ctrl-left/right to move the cursor one word left/right respectively.
* Internal API changes.
* IPython integration through `ipython-sql`_ (Thanks: `Darik Gamble`_)
* Add an ipython magic extension to embed pgcli inside ipython.
* Results from a pgcli query are sent back to ipython.
* Add an ipython magic extension to embed pgcli inside ipython.
* Results from a pgcli query are sent back to ipython.
* Multiple sql statments in the same line separated by semi-colon. (Thanks: https://github.com/macobo)
.. _`ipython-sql`: https://github.com/catherinedevlin/ipython-sql
@ -613,15 +638,15 @@ Improvements:
.. _`Amjith Ramanujam`: https://github.com/amjith
.. _`Darik Gamble`: https://github.com/darikg
.. _`Iryna Cherniavska`: https://github.com/j-bennet
.. _`Daniel Rocco`: https://github.com/drocco007
.. _`Jay Zeng`: https://github.com/jayzeng
.. _`Daniel Rocco`: https://github.com/drocco007
.. _`Jay Zeng`: https://github.com/jayzeng
.. _`蔡佳男`: https://github.com/xalley
.. _dp: https://github.com/ceocoder
.. _`Jonathan Slenders`: https://github.com/jonathanslenders
.. _`Dimitar Roustchev`: https://github.com/droustchev
.. _`François Pietka`: https://github.com/fpietka
.. _`Ali Kargın`: https://github.com/sancopanco
.. _`Brett Atoms`: https://github.com/brettatoms
.. _`Brett Atoms`: https://github.com/brettatoms
.. _`Nathan Jhaveri`: https://github.com/nathanjhaveri
.. _`Çağatay Yüksel`: https://github.com/cagatay
.. _`Michael Kaminsky`: https://github.com/mikekaminsky
@ -656,6 +681,12 @@ Improvements:
.. _`Janus Troelsen`: https://github.com/ysangkok
.. _`Fabien Meghazi`: https://github.com/amigrave
.. _`Manuel Barkhau`: https://github.com/mbarkhau
.. _`Sergii`: https://github.com/foxyterkel
.. _`Sergii`: https://github.com/foxyterkel
.. _`Emanuele Gaifas`: https://github.com/lelit
.. _`tk`: https://github.com/kanet77
.. _`Owen Stephens`: https://github.com/owst
.. _`Russell Davies`: https://github.com/russelldavies
.. _`Dick Marinus`: https://github.com/meeuw
.. _`Étienne Bersac`: https://github.com/bersace
.. _`Thomas Roten`: https://github.com/tsroten
.. _`Gustavo Castro`: https://github.com/gustavo-castro

View File

@ -11,6 +11,7 @@ import threading
import shutil
import functools
import humanize
import datetime as dt
from time import time, sleep
from codecs import open
@ -75,6 +76,13 @@ MetaQuery = namedtuple(
])
MetaQuery.__new__.__defaults__ = ('', False, 0, False, False, False, False)
OutputSettings = namedtuple(
'OutputSettings',
'table_format dcmlfmt floatfmt missingval expanded max_width case_function'
)
OutputSettings.__new__.__defaults__ = (
None, None, None, '<null>', False, None, lambda x: x
)
# no-op logging handler
class NullHandler(logging.Handler):
@ -133,6 +141,7 @@ class PGCli(object):
self.row_limit = c['main'].as_int('row_limit')
self.min_num_menu_lines = c['main'].as_int('min_num_menu_lines')
self.multiline_continuation_char = c['main']['multiline_continuation_char']
self.table_format = c['main']['table_format']
self.syntax_style = c['main']['syntax_style']
self.cli_style = c['colors']
@ -144,6 +153,8 @@ class PGCli(object):
self.decimal_format = c['data_formats']['decimal']
self.float_format = c['data_formats']['float']
self.now = dt.datetime.today()
self.completion_refresher = CompletionRefresher()
self.query_history = []
@ -157,6 +168,8 @@ class PGCli(object):
'generate_aliases': c['main'].as_bool('generate_aliases'),
'asterisk_column_order': c['main']['asterisk_column_order'],
'qualify_columns': c['main']['qualify_columns'],
'case_column_headers': c['main'].as_bool('case_column_headers'),
'search_path_filter': c['main'].as_bool('search_path_filter'),
'single_connection': single_connection,
'less_chatty': less_chatty,
'keyword_casing': keyword_casing,
@ -350,16 +363,23 @@ class PGCli(object):
:param document: Document
:return: Document
"""
# FIXME: using application.pre_run_callables like this here is not the best solution.
# It's internal api of prompt_toolkit that may change. This was added to fix #668.
# We may find a better way to do it in the future.
saved_callables = cli.application.pre_run_callables
while special.editor_command(document.text):
filename = special.get_filename(document.text)
sql, message = special.open_external_editor(filename,
sql=document.text)
query = (special.get_editor_query(document.text) or
self.get_last_query())
sql, message = special.open_external_editor(filename, sql=query)
if message:
# Something went wrong. Raise an exception and bail.
raise RuntimeError(message)
cli.current_buffer.document = Document(sql, cursor_position=len(sql))
document = cli.run(False)
cli.application.pre_run_callables = []
document = cli.run()
continue
cli.application.pre_run_callables = saved_callables
return document
def execute_command(self, text, query):
@ -442,7 +462,7 @@ class PGCli(object):
try:
while True:
document = self.cli.run(True)
document = self.cli.run()
# The reason we check here instead of inside the pgexecute is
# because we want to raise the Exit exception which will be
@ -474,6 +494,8 @@ class PGCli(object):
else:
query = self.execute_command(document.text, query)
self.now = dt.datetime.today()
# Allow PGCompleter to learn user's preferred keywords, etc.
with self._completer_lock:
self.completer.extend_query_history(document.text)
@ -501,7 +523,8 @@ class PGCli(object):
return [(Token.Prompt, prompt)]
def get_continuation_tokens(cli, width):
return [(Token.Continuation, '.' * (width - 1) + ' ')]
continuation=self.multiline_continuation_char * (width - 1) + ' '
return [(Token.Continuation, continuation)]
get_toolbar_tokens = create_toolbar_tokens_func(
lambda: self.vi_mode, self.completion_refresher.is_refreshing,
@ -596,9 +619,19 @@ class PGCli(object):
max_width = None
expanded = self.pgspecial.expanded_output or self.expanded_output
formatted = format_output(
title, cur, headers, status, self.table_format, self.decimal_format,
self.float_format, self.null_string, expanded, max_width)
settings = OutputSettings(
table_format=self.table_format,
dcmlfmt=self.decimal_format,
floatfmt=self.float_format,
missingval=self.null_string,
expanded=expanded,
max_width=max_width,
case_function=(
self.completer.case if self.settings['case_column_headers']
else lambda x: x
)
)
formatted = format_output(title, cur, headers, status, settings)
output.extend(formatted)
total = time() - start
@ -696,6 +729,7 @@ class PGCli(object):
Document(text=text, cursor_position=cursor_positition), None)
def get_prompt(self, string):
string = string.replace('\\t', self.now.strftime('%x %X'))
string = string.replace('\\u', self.pgexecute.user or '(none)')
string = string.replace('\\h', self.pgexecute.host or '(none)')
string = string.replace('\\d', self.pgexecute.dbname or '(none)')
@ -705,6 +739,11 @@ class PGCli(object):
string = string.replace('\\n', "\n")
return string
def get_last_query(self):
"""Get the last query executed or None."""
return self.query_history[-1][0] if self.query_history else None
@click.command()
# Default host is '' so psycopg2 can default to either localhost or unix socket
@ -712,8 +751,8 @@ class PGCli(object):
help='Host address of the postgres database.')
@click.option('-p', '--port', default=5432, help='Port number at which the '
'postgres instance is listening.', envvar='PGPORT')
@click.option('-U', '--user', envvar='PGUSER', help='User name to '
'connect to the postgres database.')
@click.option('-U', '--username', 'username_opt', envvar='PGUSER',
help='Username to connect to the postgres database.')
@click.option('-W', '--password', 'prompt_passwd', is_flag=True, default=False,
help='Force password prompt.')
@click.option('-w', '--no-password', 'never_prompt', is_flag=True,
@ -728,7 +767,7 @@ class PGCli(object):
envvar='PGCLIRC', help='Location of pgclirc file.')
@click.option('-D', '--dsn', default='', envvar='DSN',
help='Use DSN configured into the [alias_dsn] section of pgclirc file.')
@click.option('-R', '--row-limit', default=None, envvar='PGROWLIMIT', type=click.INT,
@click.option('--row-limit', default=None, envvar='PGROWLIMIT', type=click.INT,
help='Set threshold for row limit prompt. Use 0 to disable prompt.')
@click.option('--less-chatty', 'less_chatty', is_flag=True,
default=False,
@ -736,7 +775,7 @@ class PGCli(object):
@click.option('--prompt', help='Prompt format (Default: "\\u@\\h:\\d> ").')
@click.argument('database', default=lambda: None, envvar='PGDATABASE', nargs=1)
@click.argument('username', default=lambda: None, envvar='PGUSER', nargs=1)
def cli(database, user, host, port, prompt_passwd, never_prompt,
def cli(database, username_opt, host, port, prompt_passwd, never_prompt,
single_connection, dbname, username, version, pgclirc, dsn, row_limit,
less_chatty, prompt):
@ -766,7 +805,7 @@ def cli(database, user, host, port, prompt_passwd, never_prompt,
# Choose which ever one has a valid value.
database = database or dbname
user = username or user
user = username_opt or username
if dsn is not '':
try:
@ -809,13 +848,19 @@ def obfuscate_process_password():
setproctitle.setproctitle(process_title)
def format_output(title, cur, headers, status, table_format, dcmlfmt, floatfmt,
missingval='<null>', expanded=False, max_width=None):
def format_output(title, cur, headers, status, settings):
output = []
missingval = settings.missingval
table_format = settings.table_format
dcmlfmt = settings.dcmlfmt
floatfmt = settings.floatfmt
expanded = settings.expanded
max_width = settings.max_width
case_function = settings.case_function
if title: # Only print the title if it's not None.
output.append(title)
if cur:
headers = [utf8tounicode(x) for x in headers]
headers = [case_function(utf8tounicode(x)) for x in headers]
if expanded and headers:
output.append(expanded_table(cur, headers, missingval))
else:

View File

@ -67,7 +67,7 @@ class FunctionMetadata(object):
# E.g. 'SELECT unnest FROM unnest(...);'
return [ColumnMetadata(self.func_name, self.return_type, [])]
return [ColumnMetadata(name, type, [])
return [ColumnMetadata(name, typ, [])
for name, typ, mode in zip(
self.arg_names, self.arg_types, self.arg_modes)
if mode in ('o', 'b', 't')] # OUT, INOUT, TABLE

View File

@ -8,8 +8,8 @@ with open(literal_file) as f:
literals = json.load(f)
def get_literals(literal_type):
"""Where `literal_type` is one of 'keywords', 'functions', 'datatypes',
returns a tuple of literal values of that type"""
def get_literals(literal_type, type_=tuple):
# Where `literal_type` is one of 'keywords', 'functions', 'datatypes',
# returns a tuple of literal values of that type.
return tuple(literals[literal_type])
return type_(literals[literal_type])

View File

@ -1,153 +1,261 @@
{
"keywords": [
"ACCESS",
"ADD",
"ALL",
"ALTER TABLE",
"AND",
"ANY",
"AS",
"ASC",
"AUDIT",
"BETWEEN",
"BY",
"CASE",
"CHAR",
"CHECK",
"CLUSTER",
"COLUMN",
"COMMENT",
"COMPRESS",
"CONCURRENTLY",
"CONNECT",
"COPY",
"CREATE",
"CURRENT",
"DATABASE",
"DATE",
"DECIMAL",
"DEFAULT",
"DELETE FROM",
"DELIMITER",
"DESC",
"DESCRIBE",
"DISTINCT",
"DROP",
"EXPLAIN",
"ELSE",
"ENCODING",
"ESCAPE",
"EXCLUSIVE",
"EXISTS",
"EXTENSION",
"FILE",
"FLOAT",
"FOR",
"FORMAT",
"FORCE_QUOTE",
"FORCE_NOT_NULL",
"FREEZE",
"FROM",
"FULL",
"FUNCTION",
"GRANT",
"GROUP BY",
"HAVING",
"HEADER",
"IDENTIFIED",
"IMMEDIATE",
"IN",
"INCREMENT",
"INDEX",
"INITIAL",
"INSERT INTO",
"INTEGER",
"INTERSECT",
"INTERVAL",
"INTO",
"IS",
"JOIN",
"LANGUAGE",
"LEFT",
"LEVEL",
"LIKE",
"LIMIT",
"LOCK",
"LONG",
"MATERIALIZED VIEW",
"MAXEXTENTS",
"MINUS",
"MLSLABEL",
"MODE",
"MODIFY",
"NOT",
"NOAUDIT",
"NOTICE",
"NOCOMPRESS",
"NOWAIT",
"NULL",
"NUMBER",
"OIDS",
"OF",
"OFFLINE",
"ON",
"ONLINE",
"OPTION",
"OR",
"ORDER BY",
"OUTER",
"OWNER",
"PCTFREE",
"PRIMARY",
"PRIOR",
"PRIVILEGES",
"QUOTE",
"RAISE",
"RENAME",
"REPLACE",
"RAW",
"REFRESH MATERIALIZED VIEW",
"RESOURCE",
"RETURNS",
"REVOKE",
"RIGHT",
"ROW",
"ROWID",
"ROWNUM",
"ROWS",
"SELECT",
"SESSION",
"SET",
"SHARE",
"SIZE",
"SMALLINT",
"START",
"SUCCESSFUL",
"SYNONYM",
"SYSDATE",
"TABLE",
"TEMPLATE",
"THEN",
"TO",
"TRIGGER",
"TRUNCATE",
"UID",
"UNION",
"UNIQUE",
"UPDATE",
"USE",
"USER",
"USING",
"VALIDATE",
"VALUES",
"VARCHAR",
"VARCHAR2",
"VIEW",
"WHEN",
"WHENEVER",
"WHERE",
"WITH"
],
"keywords": {
"ACCESS": [],
"ADD": [],
"ALL": [],
"ALTER": [
"AGGREGATE",
"COLLATION",
"COLUMN",
"CONVERSION",
"DATABASE",
"DEFAULT",
"DOMAIN",
"EVENT TRIGGER",
"EXTENSION",
"FOREIGN",
"FUNCTION",
"GROUP",
"INDEX",
"LANGUAGE",
"LARGE OBJECT",
"MATERIALIZED VIEW",
"OPERATOR",
"POLICY",
"ROLE",
"RULE",
"SCHEMA",
"SEQUENCE",
"SERVER",
"SYSTEM",
"TABLE",
"TABLESPACE",
"TEXT SEARCH",
"TRIGGER",
"TYPE",
"USER",
"VIEW"
],
"AND": [],
"ANY": [],
"AS": [],
"ASC": [],
"AUDIT": [],
"BEGIN": [],
"BETWEEN": [],
"BY": [],
"CASE": [],
"CHAR": [],
"CHECK": [],
"CLUSTER": [],
"COLUMN": [],
"COMMENT": [],
"COMPRESS": [],
"CONCURRENTLY": [],
"CONNECT": [],
"COPY": [],
"CREATE": [
"ACCESS METHOD",
"AGGREGATE",
"CAST",
"COLLATION",
"CONVERSION",
"DATABASE",
"DOMAIN",
"EVENT TRIGGER",
"EXTENSION",
"FOREIGN DATA WRAPPER",
"FOREIGN EXTENSION",
"FUNCTION",
"GLOBAL",
"GROUP",
"IF NOT EXISTS",
"INDEX",
"LANGUAGE",
"LOCAL",
"MATERIALIZED VIEW",
"OPERATOR",
"OR REPLACE",
"POLICY",
"ROLE",
"RULE",
"SCHEMA",
"SEQUENCE",
"SERVER",
"TABLE",
"TABLESPACE",
"TEMPORARY",
"TEXT SEARCH",
"TRIGGER",
"TYPE",
"UNIQUE",
"UNLOGGED",
"USER",
"USER MAPPING",
"VIEW"
],
"CURRENT": [],
"DATABASE": [],
"DATE": [],
"DECIMAL": [],
"DEFAULT": [],
"DELETE FROM": [],
"DELIMITER": [],
"DESC": [],
"DESCRIBE": [],
"DISTINCT": [],
"DROP": [
"ACCESS METHOD",
"AGGREGATE",
"CAST",
"COLLATION",
"CONVERSION",
"DATABASE",
"DOMAIN",
"EVENT TRIGGER",
"EXTENSION",
"FOREIGN DATA WRAPPER",
"FOREIGN TABLE",
"FUNCTION",
"GROUP",
"INDEX",
"LANGUAGE",
"MATERIALIZED VIEW",
"OPERATOR",
"OWNED",
"POLICY",
"ROLE",
"RULE",
"SCHEMA",
"SEQUENCE",
"SERVER",
"TABLE",
"TABLESPACE",
"TEXT SEARCH",
"TRANSFORM",
"TRIGGER",
"TYPE",
"USER",
"USER MAPPING",
"VIEW"
],
"EXPLAIN": [],
"ELSE": [],
"ENCODING": [],
"ESCAPE": [],
"EXCLUSIVE": [],
"EXISTS": [],
"EXTENSION": [],
"FILE": [],
"FLOAT": [],
"FOR": [],
"FORMAT": [],
"FORCE_QUOTE": [],
"FORCE_NOT_NULL": [],
"FREEZE": [],
"FROM": [],
"FULL": [],
"FUNCTION": [],
"GRANT": [],
"GROUP BY": [],
"HAVING": [],
"HEADER": [],
"IDENTIFIED": [],
"IMMEDIATE": [],
"IN": [],
"INCREMENT": [],
"INDEX": [],
"INITIAL": [],
"INSERT INTO": [],
"INTEGER": [],
"INTERSECT": [],
"INTERVAL": [],
"INTO": [],
"IS": [],
"JOIN": [],
"LANGUAGE": [],
"LEFT": [],
"LEVEL": [],
"LIKE": [],
"LIMIT": [],
"LOCK": [],
"LONG": [],
"MATERIALIZED VIEW": [],
"MAXEXTENTS": [],
"MINUS": [],
"MLSLABEL": [],
"MODE": [],
"MODIFY": [],
"NOT": [],
"NOAUDIT": [],
"NOTICE": [],
"NOCOMPRESS": [],
"NOWAIT": [],
"NULL": [],
"NUMBER": [],
"OIDS": [],
"OF": [],
"OFFLINE": [],
"ON": [],
"ONLINE": [],
"OPTION": [],
"OR": [],
"ORDER BY": [],
"OUTER": [],
"OWNER": [],
"PCTFREE": [],
"PRIMARY": [],
"PRIOR": [],
"PRIVILEGES": [],
"QUOTE": [],
"RAISE": [],
"RENAME": [],
"REPLACE": [],
"RESET": ["ALL"],
"RAW": [],
"REFRESH MATERIALIZED VIEW": [],
"RESOURCE": [],
"RETURNS": [],
"REVOKE": [],
"RIGHT": [],
"ROW": [],
"ROWID": [],
"ROWNUM": [],
"ROWS": [],
"SELECT": [],
"SESSION": [],
"SET": [],
"SHARE": [],
"SHOW": [],
"SIZE": [],
"SMALLINT": [],
"START": [],
"SUCCESSFUL": [],
"SYNONYM": [],
"SYSDATE": [],
"TABLE": [],
"TEMPLATE": [],
"THEN": [],
"TO": [],
"TRIGGER": [],
"TRUNCATE": [],
"UID": [],
"UNION": [],
"UNIQUE": [],
"UPDATE": [],
"USE": [],
"USER": [],
"USING": [],
"VALIDATE": [],
"VALUES": [],
"VARCHAR": [],
"VARCHAR2": [],
"VIEW": [],
"WHEN": [],
"WHENEVER": [],
"WHERE": [],
"WITH": []
},
"functions": [
"AVG",
"COUNT",

View File

@ -46,7 +46,8 @@ Column = namedtuple(
)
Column.__new__.__defaults__ = (None, None, tuple(), False)
Keyword = namedtuple('Keyword', [])
Keyword = namedtuple('Keyword', ['last_token'])
Keyword.__new__.__defaults__ = (None,)
NamedQuery = namedtuple('NamedQuery', [])
Datatype = namedtuple('Datatype', ['schema'])
Alias = namedtuple('Alias', ['aliases'])
@ -226,6 +227,14 @@ def _split_multiple_statements(full_text, text_before_cursor, parsed):
return full_text, text_before_cursor, statement
SPECIALS_SUGGESTION = {
'dT': Datatype,
'df': Function,
'dt': Table,
'dv': View,
'sf': Function,
}
def suggest_special(text):
text = text.lstrip()
cmd, _, arg = parse_special_command(text)
@ -253,7 +262,7 @@ def suggest_special(text):
schema = None
if cmd[1:] == 'd':
# \d can descibe tables or views
# \d can describe tables or views
if schema:
return (Table(schema=schema),
View(schema=schema),)
@ -261,12 +270,8 @@ def suggest_special(text):
return (Schema(),
Table(schema=None),
View(schema=None),)
elif cmd[1:] in ('dt', 'dv', 'df', 'dT'):
rel_type = {'dt': Table,
'dv': View,
'df': Function,
'dT': Datatype,
}[cmd[1:]]
elif cmd[1:] in SPECIALS_SUGGESTION:
rel_type = SPECIALS_SUGGESTION[cmd[1:]]
if schema:
return (rel_type(schema=schema),)
else:
@ -376,10 +381,7 @@ def suggest_based_on_last_token(token, stmt):
elif token_v == 'set':
return (Column(table_refs=stmt.get_tables(),
local_tables=stmt.local_tables),)
elif token_v in ('by', 'distinct'):
return (Column(table_refs=stmt.get_tables(),
local_tables=stmt.local_tables, qualifiable=True),)
elif token_v in ('select', 'where', 'having'):
elif token_v in ('select', 'where', 'having', 'by', 'distinct'):
# Check for a table alias or schema qualification
parent = (stmt.identifier and stmt.identifier.get_parent_name()) or []
tables = stmt.get_tables()
@ -393,7 +395,7 @@ def suggest_based_on_last_token(token, stmt):
return (Column(table_refs=tables, local_tables=stmt.local_tables,
qualifiable=True),
Function(schema=None),
Keyword(),)
Keyword(token_v.upper()),)
elif token_v == 'as':
# Don't suggest anything for aliases
return ()
@ -490,8 +492,8 @@ def suggest_based_on_last_token(token, stmt):
if not schema:
suggestions.append(Schema())
return tuple(suggestions)
elif token_v == 'alter':
return (Keyword(),)
elif token_v in {'alter', 'create', 'drop'}:
return (Keyword(token_v.upper()),)
elif token.is_keyword:
# token is a keyword we haven't implemented any special handling for
# go backwards in the query until we find one we do recognize
@ -499,7 +501,7 @@ def suggest_based_on_last_token(token, stmt):
if prev_keyword:
return suggest_based_on_last_token(prev_keyword, stmt)
else:
return (Keyword(),)
return (Keyword(token_v.upper()),)
else:
return (Keyword(),)

View File

@ -50,6 +50,9 @@ casing_file = default
# location, one will be generated based on usage in SQL/PLPGSQL functions.
generate_casing_file = False
# Casing of column headers based on the casing_file described above
case_column_headers = True
# history_file location.
# In Unix/Linux: ~/.config/pgcli/history
# In Windows: %USERPROFILE%\AppData\Local\dbcli\pgcli\history
@ -68,6 +71,9 @@ asterisk_column_order = table_order
# Possible values: "always", never" and "if_more_than_one_table"
qualify_columns = if_more_than_one_table
# When no schema is entered, only suggest objects in search_path
search_path_filter = False
# Default pager.
# By default 'PAGER' environment variable is used
# pager = less -SRXF
@ -113,6 +119,9 @@ prompt = '\u@\h:\d> '
# Number of lines to reserve for the suggestion menu
min_num_menu_lines = 4
# Character used to left pad multi-line queries to match the prompt size.
multiline_continuation_char = '.'
# Custom colors for the completion menu, toolbar, etc.
[colors]
Token.Menu.Completions.Completion.Current = 'bg:#ffffff #000000'

View File

@ -1,7 +1,7 @@
from __future__ import print_function, unicode_literals
import logging
import re
from itertools import count, repeat
from itertools import count, repeat, chain
import operator
from collections import namedtuple, defaultdict
from pgspecial.namedqueries import NamedQueries
@ -28,12 +28,16 @@ _logger = logging.getLogger(__name__)
NamedQueries.instance = NamedQueries.from_config(
load_config(config_location() + 'config'))
Match = namedtuple('Match', ['completion', 'priority'])
_SchemaObject = namedtuple('SchemaObject', ['name', 'schema', 'function'])
def SchemaObject(name, schema=None, function=False):
return _SchemaObject(name, schema, function)
_Candidate = namedtuple('Candidate', ['completion', 'priority', 'meta', 'synonyms'])
def Candidate(completion, priority=None, meta=None, synonyms=None):
return _Candidate(completion, priority, meta, synonyms or [completion])
_Candidate = namedtuple(
'Candidate', ['completion', 'prio', 'meta', 'synonyms', 'prio2']
)
def Candidate(completion, prio=None, meta=None, synonyms=None, prio2=None):
return _Candidate(completion, prio, meta, synonyms or [completion], prio2)
normalize_ref = lambda ref: ref if ref[0] == '"' else '"' + ref.lower() + '"'
@ -47,7 +51,10 @@ def generate_alias(tbl):
[l for l, prev in zip(tbl, '_' + tbl) if prev == '_' and l != '_'])
class PGCompleter(Completer):
keywords = get_literals('keywords')
# keywords_tree: A dict mapping keywords to well known following keywords.
# e.g. 'CREATE': ['TABLE', 'USER', ...],
keywords_tree = get_literals('keywords', type_=dict)
keywords = tuple(set(chain(keywords_tree.keys(), *keywords_tree.values())))
functions = get_literals('functions')
datatypes = get_literals('datatypes')
@ -57,6 +64,7 @@ class PGCompleter(Completer):
self.pgspecial = pgspecial
self.prioritizer = PrevalenceCounter()
settings = settings or {}
self.search_path_filter = settings.get('search_path_filter')
self.generate_aliases = settings.get('generate_aliases')
self.casing_file = settings.get('casing_file')
self.generate_casing_file = settings.get('generate_casing_file')
@ -315,7 +323,7 @@ class PGCompleter(Completer):
matches = []
for cand in collection:
if isinstance(cand, _Candidate):
item, prio, display_meta, synonyms = cand
item, prio, display_meta, synonyms, prio2 = cand
if display_meta is None:
display_meta = meta
syn_matches = (_match(x) for x in synonyms)
@ -323,7 +331,7 @@ class PGCompleter(Completer):
syn_matches = [m for m in syn_matches if m]
sort_key = max(syn_matches) if syn_matches else None
else:
item, display_meta, prio = cand, meta, 0
item, display_meta, prio, prio2 = cand, meta, 0, 0
sort_key = _match(cand)
if sort_key:
@ -345,7 +353,10 @@ class PGCompleter(Completer):
+ tuple(c for c in item))
item = self.case(item)
priority = sort_key, type_priority, prio, priority_func(item), lexical_priority
priority = (
sort_key, type_priority, prio, priority_func(item),
prio2, lexical_priority
)
matches.append(Match(
completion=Completion(item, -text_len,
@ -550,8 +561,8 @@ class PGCompleter(Completer):
return self.find_matches(word_before_cursor, conds, meta='join')
def get_function_matches(self, suggestion, word_before_cursor, alias=False):
def _cand(func_name, alias):
return self._make_cand(func_name, alias, suggestion, function=True)
def _cand(func, alias):
return self._make_cand(func, alias, suggestion)
if suggestion.filter == 'for_from_clause':
# Only suggest functions allowed in FROM clause
filt = lambda f: not f.is_aggregate and not f.is_window
@ -597,24 +608,28 @@ class PGCompleter(Completer):
+ self.get_view_matches(v_sug, word_before_cursor, alias)
+ self.get_function_matches(f_sug, word_before_cursor, alias))
def _make_cand(self, tbl, do_alias, suggestion, function=False):
cased_tbl = self.case(tbl)
alias = self.alias(cased_tbl, suggestion.table_refs)
# Note: tbl is a SchemaObject
def _make_cand(self, tbl, do_alias, suggestion):
cased_tbl = self.case(tbl.name)
if do_alias:
alias = self.alias(cased_tbl, suggestion.table_refs)
synonyms = (cased_tbl, generate_alias(cased_tbl))
maybe_parens = '()' if function else ''
maybe_parens = '()' if tbl.function else ''
maybe_alias = (' ' + alias) if do_alias else ''
item = cased_tbl + maybe_parens + maybe_alias
return Candidate(item, synonyms=synonyms)
maybe_schema = (self.case(tbl.schema) + '.') if tbl.schema else ''
item = maybe_schema + cased_tbl + maybe_parens + maybe_alias
prio2 = 0 if tbl.schema else 1
return Candidate(item, synonyms=synonyms, prio2=prio2)
def get_table_matches(self, suggestion, word_before_cursor, alias=False):
tables = self.populate_schema_objects(suggestion.schema, 'tables')
tables.extend(tbl.name for tbl in suggestion.local_tables)
tables.extend(SchemaObject(tbl.name) for tbl in suggestion.local_tables)
# Unless we're sure the user really wants them, don't suggest the
# pg_catalog tables that are implicitly on the search path
if not suggestion.schema and (
not word_before_cursor.startswith('pg_')):
tables = [t for t in tables if not t.startswith('pg_')]
tables = [t for t in tables if not t.name.startswith('pg_')]
tables = [self._make_cand(t, alias, suggestion) for t in tables]
return self.find_matches(word_before_cursor, tables, meta='table')
@ -624,7 +639,7 @@ class PGCompleter(Completer):
if not suggestion.schema and (
not word_before_cursor.startswith('pg_')):
views = [v for v in views if not v.startswith('pg_')]
views = [v for v in views if not v.name.startswith('pg_')]
views = [self._make_cand(v, alias, suggestion) for v in views]
return self.find_matches(word_before_cursor, views, meta='view')
@ -637,7 +652,14 @@ class PGCompleter(Completer):
return self.find_matches(word_before_cursor, self.databases,
meta='database')
def get_keyword_matches(self, _, word_before_cursor):
def get_keyword_matches(self, suggestion, word_before_cursor):
keywords = self.keywords_tree.keys()
# Get well known following keywords for the last token. If any, narrow
# candidates to this list.
next_keywords = self.keywords_tree.get(suggestion.last_token, [])
if next_keywords:
keywords = next_keywords
casing = self.keyword_casing
if casing == 'auto':
if word_before_cursor and word_before_cursor[-1].islower():
@ -646,9 +668,9 @@ class PGCompleter(Completer):
casing = 'upper'
if casing == 'upper':
keywords = [k.upper() for k in self.keywords]
keywords = [k.upper() for k in keywords]
else:
keywords = [k.lower() for k in self.keywords]
keywords = [k.lower() for k in keywords]
return self.find_matches(word_before_cursor, keywords,
mode='strict', meta='keyword')
@ -672,6 +694,7 @@ class PGCompleter(Completer):
def get_datatype_matches(self, suggestion, word_before_cursor):
# suggest custom datatypes
types = self.populate_schema_objects(suggestion.schema, 'datatypes')
types = [self._make_cand(t, False, suggestion) for t in types]
matches = self.find_matches(word_before_cursor, types, meta='datatype')
if not suggestion.schema:
@ -747,22 +770,33 @@ class PGCompleter(Completer):
return columns
def populate_schema_objects(self, schema, obj_type):
"""Returns list of tables or functions for a (optional) schema"""
metadata = self.dbmetadata[obj_type]
def _get_schemas(self, obj_typ, schema):
""" Returns a list of schemas from which to suggest objects
schema is the schema qualification input by the user (if any)
"""
metadata = self.dbmetadata[obj_typ]
if schema:
try:
objects = metadata[self.escape_name(schema)].keys()
except KeyError:
# schema doesn't exist
objects = []
else:
schemas = self.search_path
objects = [obj for schema in schemas
for obj in metadata[schema].keys()]
schema = self.escape_name(schema)
return [schema] if schema in metadata else []
return self.search_path if self.search_path_filter else metadata.keys()
return [self.case(o) for o in objects]
def _maybe_schema(self, schema, parent):
return None if parent or schema in self.search_path else schema
def populate_schema_objects(self, schema, obj_type):
"""Returns a list of SchemaObjects representing tables, views, funcs
schema is the schema qualification input by the user (if any)
"""
return [
SchemaObject(
name=obj,
schema=(self._maybe_schema(schema=sch, parent=schema)),
function=(obj_type == 'functions')
)
for sch in self._get_schemas(obj_type, schema)
for obj in self.dbmetadata[obj_type][sch].keys()
]
def populate_functions(self, schema, filter_func):
"""Returns a list of function names
@ -772,24 +806,20 @@ class PGCompleter(Completer):
kept or discarded
"""
metadata = self.dbmetadata['functions']
# Because of multiple dispatch, we can have multiple functions
# with the same name, which is why `for meta in metas` is necessary
# in the comprehensions below
if schema:
schema = self.escape_name(schema)
try:
return [func for (func, metas) in metadata[schema].items()
for meta in metas
if filter_func(meta)]
except KeyError:
return []
else:
return [func for schema in self.search_path
for (func, metas) in metadata[schema].items()
for meta in metas
if filter_func(meta)]
return [
SchemaObject(
name=func,
schema=(self._maybe_schema(schema=sch, parent=schema)),
function=True
)
for sch in self._get_schemas('functions', schema)
for (func, metas) in self.dbmetadata['functions'][sch].items()
for meta in metas
if filter_func(meta)
]

View File

@ -2,6 +2,7 @@ import traceback
import logging
import psycopg2
import psycopg2.extras
import psycopg2.errorcodes
import psycopg2.extensions as ext
import sqlparse
import pgspecial as special
@ -296,10 +297,8 @@ class PGExecute(object):
_logger.error("sql: %r, error: %r", sql, e)
_logger.error("traceback: %r", traceback.format_exc())
if (isinstance(e, psycopg2.OperationalError)
if (self._must_raise(e)
or not exception_formatter):
# Always raise operational errors, regardless of on_error
# specification
raise
yield None, None, None, exception_formatter(e), sql, False
@ -307,6 +306,23 @@ class PGExecute(object):
if not on_error_resume:
break
def _must_raise(self, e):
"""Return true if e is an error that should not be caught in ``run``.
``OperationalError``s are raised for errors that are not under the
control of the programmer. Usually that means unexpected disconnects,
which we shouldn't catch; we handle uncaught errors by prompting the
user to reconnect. We *do* want to catch OperationalErrors caused by a
lock being unavailable, as reconnecting won't solve that problem.
:param e: DatabaseError. An exception raised while executing a query.
:return: Bool. True if ``run`` must raise this exception.
"""
return (isinstance(e, psycopg2.OperationalError) and
psycopg2.errorcodes.lookup(e.pgcode) != 'LOCK_NOT_AVAILABLE')
def execute_normal_sql(self, split_sql):
"""Returns tuple (title, rows, headers, status)"""
_logger.debug('Regular sql statement. sql: %r', split_sql)

2
pylintrc Normal file
View File

@ -0,0 +1,2 @@
[MESSAGES CONTROL]
disable=missing-docstring,invalid-name

View File

@ -3,3 +3,5 @@ mock>=1.0.1
tox>=1.9.2
behave>=1.2.4
pexpect==3.3
coverage==4.3.4
pep8radius

View File

@ -15,7 +15,7 @@ install_requirements = [
'pgspecial>=1.7.0',
'click >= 4.1',
'Pygments >= 2.0', # Pygments has to be Capitalcased. WTF?
'prompt_toolkit>=1.0.9,<1.1.0',
'prompt_toolkit>=1.0.10,<1.1.0',
'psycopg2 >= 2.5.4',
'sqlparse >=0.2.2,<0.3.0',
'configobj >= 5.0.6',
@ -32,36 +32,36 @@ if platform.system() != 'Windows' and not platform.system().startswith("CYGWIN")
install_requirements.append('setproctitle >= 1.1.9')
setup(
name='pgcli',
author='Amjith Ramanujam',
author_email='amjith.r+pgcli@gmail.com',
version=version,
license='LICENSE.txt',
url='http://pgcli.com',
packages=find_packages(),
package_data={'pgcli': ['pgclirc',
'packages/pgliterals/pgliterals.json']},
description=description,
long_description=open('README.rst').read(),
install_requires=install_requirements,
entry_points='''
[console_scripts]
pgcli=pgcli.main:cli
''',
classifiers=[
'Intended Audience :: Developers',
'License :: OSI Approved :: BSD License',
'Operating System :: Unix',
'Programming Language :: Python',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: SQL',
'Topic :: Database',
'Topic :: Database :: Front-Ends',
'Topic :: Software Development',
'Topic :: Software Development :: Libraries :: Python Modules',
],
)
name='pgcli',
author='Pgcli Core Team',
author_email='pgcli-dev@googlegroups.com',
version=version,
license='LICENSE.txt',
url='http://pgcli.com',
packages=find_packages(),
package_data={'pgcli': ['pgclirc',
'packages/pgliterals/pgliterals.json']},
description=description,
long_description=open('README.rst').read(),
install_requires=install_requirements,
entry_points='''
[console_scripts]
pgcli=pgcli.main:cli
''',
classifiers=[
'Intended Audience :: Developers',
'License :: OSI Approved :: BSD License',
'Operating System :: Unix',
'Programming Language :: Python',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: SQL',
'Topic :: Database',
'Topic :: Database :: Front-Ends',
'Topic :: Software Development',
'Topic :: Software Development :: Libraries :: Python Modules',
],
)

View File

View File

@ -2,21 +2,14 @@ Feature: run the cli,
call the help command,
exit the cli
Scenario: run the cli
Given we have pgcli installed
when we run pgcli
then we see pgcli prompt
Scenario: run "\?" command
Given we have pgcli installed
when we run pgcli
and we wait for prompt
and we send "\?" command
When we send "\?" command
then we see help output
Scenario: run source command
When we send source command
then we see help output
Scenario: run the cli and exit
Given we have pgcli installed
when we run pgcli
and we wait for prompt
and we send "ctrl + d"
then pgcli exits
When we send "ctrl + d"
then dbcli exits

View File

@ -2,21 +2,15 @@ Feature: manipulate databases:
create, drop, connect, disconnect
Scenario: create and drop temporary database
Given we have pgcli installed
when we run pgcli
and we wait for prompt
and we create database
When we create database
then we see database created
when we drop database
then we see database dropped
when we connect to postgres
when we connect to dbserver
then we see database connected
Scenario: connect and disconnect from test database
Given we have pgcli installed
when we run pgcli
and we wait for prompt
and we connect to test database
When we connect to test database
then we see database connected
when we connect to postgres
when we connect to dbserver
then we see database connected

View File

@ -2,10 +2,7 @@ Feature: manipulate tables:
create, insert, update, select, delete from, drop
Scenario: create, insert, select from, update, drop table
Given we have pgcli installed
when we run pgcli
and we wait for prompt
and we connect to test database
When we connect to test database
then we see database connected
when we create table
then we see table created
@ -19,5 +16,5 @@ Feature: manipulate tables:
then we see record deleted
when we drop table
then we see table dropped
when we connect to postgres
when we connect to dbserver
then we see database connected

View File

@ -6,6 +6,9 @@ import os
import sys
import db_utils as dbutils
import fixture_utils as fixutils
import pexpect
from steps.wrappers import run_cli, wait_prompt
def before_all(context):
@ -15,11 +18,14 @@ def before_all(context):
os.environ['LINES'] = "100"
os.environ['COLUMNS'] = "100"
os.environ['PAGER'] = 'cat'
os.environ['EDITOR'] = 'nano'
os.environ['EDITOR'] = 'ex'
context.package_root = os.path.abspath(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
os.environ["COVERAGE_PROCESS_START"] = os.path.join(context.package_root,
'.coveragerc')
context.exit_sent = False
vi = '_'.join([str(x) for x in sys.version_info[:3]])
@ -42,15 +48,23 @@ def before_all(context):
),
'dbname': db_name_full,
'dbname_tmp': db_name_full + '_tmp',
'vi': vi
'vi': vi,
'cli_command': context.config.userdata.get('pg_cli_command', None) or
sys.executable +
' -c "import coverage; coverage.process_startup(); import pgcli.main; pgcli.main.cli()"',
'pager_boundary': '---boundary---',
}
os.environ['PAGER'] = "{0} {1} {2}".format(
sys.executable,
os.path.join(context.package_root, "tests/features/wrappager.py"),
context.conf['pager_boundary'])
# Store old env vars.
context.pgenv = {
'PGDATABASE': os.environ.get('PGDATABASE', None),
'PGUSER': os.environ.get('PGUSER', None),
'PGHOST': os.environ.get('PGHOST', None),
'PGPASS': os.environ.get('PGPASS', None),
'PGPASSWORD': os.environ.get('PGPASSWORD', None),
}
# Set new env vars.
@ -59,10 +73,10 @@ def before_all(context):
os.environ['PGHOST'] = context.conf['host']
if context.conf['pass']:
os.environ['PGPASS'] = context.conf['pass']
os.environ['PGPASSWORD'] = context.conf['pass']
else:
if 'PGPASS' in os.environ:
del os.environ['PGPASS']
if 'PGPASSWORD' in os.environ:
del os.environ['PGPASSWORD']
if 'PGHOST' in os.environ:
del os.environ['PGHOST']
@ -89,11 +103,30 @@ def after_all(context):
os.environ[k] = v
def before_step(context, _):
context.atprompt = False
def before_scenario(context, _):
run_cli(context)
wait_prompt(context)
def after_scenario(context, _):
"""
Cleans up after each test complete.
"""
"""Cleans up after each test complete."""
if hasattr(context, 'cli') and not context.exit_sent:
# Terminate nicely.
context.cli.terminate()
# Quit nicely.
if not context.atprompt:
dbname = context.currentdb
context.cli.expect_exact(
'{0}> '.format(dbname),
timeout=5
)
context.cli.sendcontrol('d')
context.cli.expect_exact(pexpect.EOF, timeout=5)
# TODO: uncomment to debug a failure
# def after_step(context, step):
# if step.status == "failed":
# import ipdb; ipdb.set_trace()

View File

@ -1,21 +1,62 @@
Command
Description
\#
Refresh auto-completions.
\?
Show Commands.
\c[onnect] database_name
\d [pattern]
Change to a new database.
\copy [tablename] to/from [filename]
Copy data between a file and a table.
\d[+] [pattern]
List or describe tables, views and sequences. |
\dT[S+] [pattern]
List data types
\db[+] [pattern]
List tablespaces.
\df[+] [pattern]
List functions.
\di[+] [pattern]
List indexes.
\dm[+] [pattern]
List materialized views.
\dn[+] [pattern]
List schemas.
\ds[+] [pattern]
List sequences.
\dt[+] [pattern]
List tables.
\du[+] [pattern]
List roles.
\dv[+] [pattern]
List views.
\dx[+] [pattern]
List extensions.
\e [file]
Edit the query with external editor.
\h
Show SQL syntax and help.
\i filename
Execute commands from file.
\l
\n[+] [name]
List databases.
\n[+] [name] [param1 param2 ...]
List or execute named queries.
\nd [name]
Delete a named query.
\ns name query
Save a named query.
\o [filename]
Send all query results to file.
\pager [command]
Set PAGER. Pring the query results via PAGER. |
\pset [key] [value]
A limited version of traditional \pset
\refresh
Refresh auto-completions.
\sf[+] FUNCNAME
Show a function's definition.
\timing
\x
Toggle timing of commands.
\x
Toggle expanded output.

View File

@ -1,10 +1,17 @@
Feature: I/O commands
Scenario: edit sql in file with external editor
Given we have pgcli installed
when we run pgcli
and we wait for prompt
and we start external editor providing a file name
When we start external editor providing a file name
and we type sql in the editor
and we exit the editor
then we see the sql in prompt
then we see dbcli prompt
and we see the sql in prompt
Scenario: tee output from query
When we tee output
and we wait for prompt
and we query "select 123456"
and we wait for prompt
and we notee output
and we wait for prompt
then we see 123456 in tee output

View File

@ -2,10 +2,7 @@ Feature: named queries:
save, use and delete named queries
Scenario: save, use and delete named queries
Given we have pgcli installed
when we run pgcli
and we wait for prompt
and we connect to test database
When we connect to test database
then we see database connected
when we save a named query
then we see the named query saved

View File

@ -2,9 +2,6 @@ Feature: Special commands
@wip
Scenario: run refresh command
Given we have pgcli installed
when we run pgcli
and we wait for prompt
and we refresh completions
When we refresh completions
and we wait for prompt
then we see completions refresh started

View File

View File

@ -0,0 +1,48 @@
# -*- coding: utf-8
"""
Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
import tempfile
from behave import when
import wrappers
@when('we run dbcli')
def step_run_cli(context):
wrappers.run_cli(context)
@when('we wait for prompt')
def step_wait_prompt(context):
wrappers.wait_prompt(context)
@when('we send "ctrl + d"')
def step_ctrl_d(context):
"""
Send Ctrl + D to hopefully exit.
"""
context.cli.sendcontrol('d')
context.exit_sent = True
@when('we send "\?" command')
def step_send_help(context):
"""
Send \? to see help.
"""
context.cli.sendline('\?')
@when(u'we send source command')
def step_send_source_command(context):
with tempfile.NamedTemporaryFile() as f:
f.write(b'\?')
f.flush()
context.cli.sendline('\i {0}'.format(f.name))
wrappers.expect_exact(
context, context.conf['pager_boundary'] + '\r\n', timeout=5)

View File

@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
"""
Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
import pexpect
import wrappers
from behave import when, then
@when('we create database')
def step_db_create(context):
"""
Send create database.
"""
context.cli.sendline('create database {0};'.format(
context.conf['dbname_tmp']))
context.response = {
'database_name': context.conf['dbname_tmp']
}
@when('we drop database')
def step_db_drop(context):
"""
Send drop database.
"""
context.cli.sendline('drop database {0};'.format(
context.conf['dbname_tmp']))
@when('we connect to test database')
def step_db_connect_test(context):
"""
Send connect to database.
"""
db_name = context.conf['dbname']
context.cli.sendline('\\connect {0}'.format(db_name))
@when('we connect to dbserver')
def step_db_connect_dbserver(context):
"""
Send connect to database.
"""
context.cli.sendline('\\connect postgres')
context.currentdb = 'postgres'
@then('dbcli exits')
def step_wait_exit(context):
"""
Make sure the cli exits.
"""
wrappers.expect_exact(context, pexpect.EOF, timeout=5)
@then('we see dbcli prompt')
def step_see_prompt(context):
"""
Wait to see the prompt.
"""
wrappers.expect_exact(context, '{0}> '.format(context.conf['dbname']), timeout=5)
context.atprompt = True
@then('we see help output')
def step_see_help(context):
for expected_line in context.fixture_data['help_commands.txt']:
wrappers.expect_exact(context, expected_line, timeout=1)
@then('we see database created')
def step_see_db_created(context):
"""
Wait to see create database output.
"""
wrappers.expect_pager(context, 'CREATE DATABASE\r\n', timeout=5)
@then('we see database dropped')
def step_see_db_dropped(context):
"""
Wait to see drop database output.
"""
wrappers.expect_pager(context, 'DROP DATABASE\r\n', timeout=2)
@then('we see database connected')
def step_see_db_connected(context):
"""
Wait to see drop database output.
"""
wrappers.expect_exact(context, 'You are now connected to database', timeout=2)

View File

@ -0,0 +1,117 @@
# -*- coding: utf-8
"""
Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
import wrappers
from behave import when, then
from textwrap import dedent
@when('we create table')
def step_create_table(context):
"""
Send create table.
"""
context.cli.sendline('create table a(x text);')
@when('we insert into table')
def step_insert_into_table(context):
"""
Send insert into table.
"""
context.cli.sendline('''insert into a(x) values('xxx');''')
@when('we update table')
def step_update_table(context):
"""
Send insert into table.
"""
context.cli.sendline('''update a set x = 'yyy' where x = 'xxx';''')
@when('we select from table')
def step_select_from_table(context):
"""
Send select from table.
"""
context.cli.sendline('select * from a;')
@when('we delete from table')
def step_delete_from_table(context):
"""
Send deete from table.
"""
context.cli.sendline('''delete from a where x = 'yyy';''')
@when('we drop table')
def step_drop_table(context):
"""
Send drop table.
"""
context.cli.sendline('drop table a;')
@then('we see table created')
def step_see_table_created(context):
"""
Wait to see create table output.
"""
wrappers.expect_pager(context, 'CREATE TABLE\r\n', timeout=2)
@then('we see record inserted')
def step_see_record_inserted(context):
"""
Wait to see insert output.
"""
wrappers.expect_pager(context, 'INSERT 0 1\r\n', timeout=2)
@then('we see record updated')
def step_see_record_updated(context):
"""
Wait to see update output.
"""
wrappers.expect_pager(context, 'UPDATE 1\r\n', timeout=2)
@then('we see data selected')
def step_see_data_selected(context):
"""
Wait to see select output.
"""
wrappers.expect_pager(
context,
dedent('''\
+-----+\r
| x |\r
|-----|\r
| yyy |\r
+-----+\r
SELECT 1\r
'''),
timeout=1)
@then('we see record deleted')
def step_see_data_deleted(context):
"""
Wait to see delete output.
"""
wrappers.expect_pager(context, 'DELETE 1\r\n', timeout=2)
@then('we see table dropped')
def step_see_table_dropped(context):
"""
Wait to see drop output.
"""
wrappers.expect_pager(context, 'DROP TABLE\r\n', timeout=2)

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8
from __future__ import unicode_literals
import os
import os.path
import wrappers
from behave import when, then
@when('we start external editor providing a file name')
def step_edit_file(context):
"""Edit file with external editor."""
context.editor_file_name = os.path.join(
context.package_root, 'test_file_{0}.sql'.format(context.conf['vi']))
if os.path.exists(context.editor_file_name):
os.remove(context.editor_file_name)
context.cli.sendline('\e {0}'.format(
os.path.basename(context.editor_file_name)))
wrappers.expect_exact(
context, 'Entering Ex mode. Type "visual" to go to Normal mode.', timeout=2)
wrappers.expect_exact(context, '\r\n:', timeout=2)
@when('we type sql in the editor')
def step_edit_type_sql(context):
context.cli.sendline('i')
context.cli.sendline('select * from abc')
context.cli.sendline('.')
wrappers.expect_exact(context, ':', timeout=2)
@when('we exit the editor')
def step_edit_quit(context):
context.cli.sendline('x')
wrappers.expect_exact(context, "written", timeout=2)
@then('we see the sql in prompt')
def step_edit_done_sql(context):
for match in 'select * from abc'.split(' '):
wrappers.expect_exact(context, match, timeout=1)
# Cleanup the command line.
context.cli.sendcontrol('c')
# Cleanup the edited file.
if context.editor_file_name and os.path.exists(context.editor_file_name):
os.remove(context.editor_file_name)
@when(u'we tee output')
def step_tee_ouptut(context):
context.tee_file_name = os.path.join(
context.package_root, 'tee_file_{0}.sql'.format(context.conf['vi']))
if os.path.exists(context.tee_file_name):
os.remove(context.tee_file_name)
context.cli.sendline('\o {0}'.format(
os.path.basename(context.tee_file_name)))
wrappers.expect_exact(
context, context.conf['pager_boundary'] + '\r\n', timeout=5)
wrappers.expect_exact(context, "Writing to file", timeout=5)
wrappers.expect_exact(
context, context.conf['pager_boundary'] + '\r\n', timeout=5)
wrappers.expect_exact(context, "Time", timeout=5)
@when(u'we query "select 123456"')
def step_query_select_123456(context):
context.cli.sendline('select 123456')
@when(u'we notee output')
def step_notee_output(context):
context.cli.sendline('notee')
wrappers.expect_exact(context, "Time", timeout=5)
@then(u'we see 123456 in tee output')
def step_see_123456_in_ouput(context):
with open(context.tee_file_name) as f:
assert '123456' in f.read()
if os.path.exists(context.tee_file_name):
os.remove(context.tee_file_name)
context.atprompt = True

View File

@ -0,0 +1,59 @@
# -*- coding: utf-8
"""
Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
import wrappers
from behave import when, then
@when('we save a named query')
def step_save_named_query(context):
"""
Send \ns command
"""
context.cli.sendline('\\ns foo SELECT 12345')
@when('we use a named query')
def step_use_named_query(context):
"""
Send \n command
"""
context.cli.sendline('\\n foo')
@when('we delete a named query')
def step_delete_named_query(context):
"""
Send \nd command
"""
context.cli.sendline('\\nd foo')
@then('we see the named query saved')
def step_see_named_query_saved(context):
"""
Wait to see query saved.
"""
wrappers.expect_pager(context, 'Saved.\r\n', timeout=1)
@then('we see the named query executed')
def step_see_named_query_executed(context):
"""
Wait to see select output.
"""
wrappers.expect_exact(context, '12345', timeout=1)
wrappers.expect_exact(context, 'SELECT 1', timeout=1)
@then('we see the named query deleted')
def step_see_named_query_deleted(context):
"""
Wait to see query deleted.
"""
wrappers.expect_pager(context, 'foo: Deleted\r\n', timeout=1)

View File

@ -0,0 +1,27 @@
# -*- coding: utf-8
"""
Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
import wrappers
from behave import when, then
@when('we refresh completions')
def step_refresh_completions(context):
"""
Send refresh command.
"""
context.cli.sendline('\\refresh')
@then('we see completions refresh started')
def step_see_refresh_started(context):
"""
Wait to see refresh output.
"""
wrappers.expect_pager(
context, 'Auto-completion refresh started in the background.\r\n', timeout=2)

View File

@ -1,347 +0,0 @@
# -*- coding: utf-8 -*-
"""
Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
import pip
import pexpect
import os
import re
from behave import given, when, then
@given('we have pgcli installed')
def step_install_cli(_):
"""
Check that pgcli is in installed modules.
"""
dists = set([di.key for di in pip.get_installed_distributions()])
assert 'pgcli' in dists
@when('we run pgcli')
def step_run_cli(context):
"""
Run the process using pexpect.
"""
context.cli = pexpect.spawnu('pgcli')
context.exit_sent = False
@when('we wait for prompt')
def step_wait_prompt(context):
"""
Make sure prompt is displayed.
"""
_expect_exact(context, '{0}> '.format(context.conf['dbname']), timeout=5)
@when('we send "ctrl + d"')
def step_ctrl_d(context):
"""
Send Ctrl + D to hopefully exit.
"""
context.cli.sendcontrol('d')
context.exit_sent = True
@when('we send "\?" command')
def step_send_help(context):
"""
Send \? to see help.
"""
context.cli.sendline('\?')
@when('we save a named query')
def step_save_named_query(context):
"""
Send \ns command
"""
context.cli.sendline('\\ns foo SELECT 12345')
@when('we use a named query')
def step_use_named_query(context):
"""
Send \n command
"""
context.cli.sendline('\\n foo')
@when('we delete a named query')
def step_delete_named_query(context):
"""
Send \nd command
"""
context.cli.sendline('\\nd foo')
@when('we create database')
def step_db_create(context):
"""
Send create database.
"""
context.cli.sendline('create database {0};'.format(
context.conf['dbname_tmp']))
context.response = {
'database_name': context.conf['dbname_tmp']
}
@when('we drop database')
def step_db_drop(context):
"""
Send drop database.
"""
context.cli.sendline('drop database {0};'.format(
context.conf['dbname_tmp']))
@when('we create table')
def step_create_table(context):
"""
Send create table.
"""
context.cli.sendline('create table a(x text);')
@when('we insert into table')
def step_insert_into_table(context):
"""
Send insert into table.
"""
context.cli.sendline('''insert into a(x) values('xxx');''')
@when('we update table')
def step_update_table(context):
"""
Send insert into table.
"""
context.cli.sendline('''update a set x = 'yyy' where x = 'xxx';''')
@when('we select from table')
def step_select_from_table(context):
"""
Send select from table.
"""
context.cli.sendline('select * from a;')
@when('we delete from table')
def step_delete_from_table(context):
"""
Send deete from table.
"""
context.cli.sendline('''delete from a where x = 'yyy';''')
@when('we drop table')
def step_drop_table(context):
"""
Send drop table.
"""
context.cli.sendline('drop table a;')
@when('we connect to test database')
def step_db_connect_test(context):
"""
Send connect to database.
"""
db_name = context.conf['dbname']
context.cli.sendline('\\connect {0}'.format(db_name))
@when('we start external editor providing a file name')
def step_edit_file(context):
"""
Edit file with external editor.
"""
context.editor_file_name = 'test_file_{0}.sql'.format(context.conf['vi'])
if os.path.exists(context.editor_file_name):
os.remove(context.editor_file_name)
context.cli.sendline('\e {0}'.format(context.editor_file_name))
_expect_exact(context, 'nano', timeout=2)
@when('we type sql in the editor')
def step_edit_type_sql(context):
context.cli.sendline('select * from abc')
# Write the file.
context.cli.sendcontrol('o')
# Confirm file name sending "enter".
context.cli.sendcontrol('m')
@when('we exit the editor')
def step_edit_quit(context):
context.cli.sendcontrol('x')
@then('we see the sql in prompt')
def step_edit_done_sql(context):
_expect_exact(context, 'select * from abc', timeout=2)
# Cleanup the command line.
context.cli.sendcontrol('u')
# Cleanup the edited file.
if context.editor_file_name and os.path.exists(context.editor_file_name):
os.remove(context.editor_file_name)
@when('we connect to postgres')
def step_db_connect_postgres(context):
"""
Send connect to database.
"""
context.cli.sendline('\\connect postgres')
@when('we refresh completions')
def step_refresh_completions(context):
"""
Send refresh command.
"""
context.cli.sendline('\\refresh')
@then('pgcli exits')
def step_wait_exit(context):
"""
Make sure the cli exits.
"""
_expect_exact(context, pexpect.EOF, timeout=5)
@then('we see pgcli prompt')
def step_see_prompt(context):
"""
Wait to see the prompt.
"""
_expect_exact(context, '{0}> '.format(context.conf['dbname']), timeout=5)
@then('we see help output')
def step_see_help(context):
for expected_line in context.fixture_data['help_commands.txt']:
_expect_exact(context, expected_line, timeout=1)
@then('we see database created')
def step_see_db_created(context):
"""
Wait to see create database output.
"""
_expect_exact(context, 'CREATE DATABASE', timeout=2)
@then('we see database dropped')
def step_see_db_dropped(context):
"""
Wait to see drop database output.
"""
_expect_exact(context, 'DROP DATABASE', timeout=2)
@then('we see database connected')
def step_see_db_connected(context):
"""
Wait to see drop database output.
"""
_expect_exact(context, 'You are now connected to database', timeout=2)
@then('we see table created')
def step_see_table_created(context):
"""
Wait to see create table output.
"""
_expect_exact(context, 'CREATE TABLE', timeout=2)
@then('we see record inserted')
def step_see_record_inserted(context):
"""
Wait to see insert output.
"""
_expect_exact(context, 'INSERT 0 1', timeout=2)
@then('we see record updated')
def step_see_record_updated(context):
"""
Wait to see update output.
"""
_expect_exact(context, 'UPDATE 1', timeout=2)
@then('we see data selected')
def step_see_data_selected(context):
"""
Wait to see select output.
"""
_expect_exact(context, 'yyy', timeout=1)
_expect_exact(context, 'SELECT 1', timeout=1)
@then('we see record deleted')
def step_see_data_deleted(context):
"""
Wait to see delete output.
"""
_expect_exact(context, 'DELETE 1', timeout=2)
@then('we see table dropped')
def step_see_table_dropped(context):
"""
Wait to see drop output.
"""
_expect_exact(context, 'DROP TABLE', timeout=2)
@then('we see the named query saved')
def step_see_named_query_saved(context):
"""
Wait to see query saved.
"""
_expect_exact(context, 'Saved.', timeout=1)
@then('we see the named query executed')
def step_see_named_query_executed(context):
"""
Wait to see select output.
"""
_expect_exact(context, '12345', timeout=1)
_expect_exact(context, 'SELECT 1', timeout=1)
@then('we see the named query deleted')
def step_see_named_query_deleted(context):
"""
Wait to see query deleted.
"""
_expect_exact(context, 'foo: Deleted', timeout=1)
@then('we see completions refresh started')
def step_see_refresh_started(context):
"""
Wait to see refresh output.
"""
_expect_exact(context, 'refresh started in the background', timeout=2)
def _expect_exact(context, expected, timeout):
try:
context.cli.expect_exact(expected, timeout=timeout)
except:
# Strip color codes out of the output.
actual = re.sub(r'\x1b\[([0-9A-Za-z;?])+[m|K]?', '', context.cli.before)
raise Exception('Expected:\n---\n{0}\n---\n\nActual:\n---\n{1}\n---'.format(
expected,
actual))

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8
from __future__ import unicode_literals
import re
import pexpect
def expect_exact(context, expected, timeout):
try:
context.cli.expect_exact(expected, timeout=timeout)
except:
# Strip color codes out of the output.
actual = re.sub(r'\x1b\[([0-9A-Za-z;?])+[m|K]?', '', context.cli.before)
raise Exception('Expected:\n---\n{0!r}\n---\n\nActual:\n---\n{1!r}\n---'.format(
expected,
actual))
def expect_pager(context, expected, timeout):
expect_exact(context, "{0}\r\n{1}{0}\r\n".format(
context.conf['pager_boundary'], expected), timeout=timeout)
def run_cli(context):
"""Run the process using pexpect."""
cli_cmd = context.conf.get('cli_command')
context.cli = pexpect.spawnu(cli_cmd, cwd=context.package_root)
context.exit_sent = False
context.currentdb = context.conf['dbname']
def wait_prompt(context):
"""Make sure prompt is displayed."""
expect_exact(context, '{0}> '.format(context.conf['dbname']), timeout=5)

16
tests/features/wrappager.py Executable file
View File

@ -0,0 +1,16 @@
#!/usr/bin/env python
import sys
def wrappager(boundary):
print(boundary)
while 1:
buf = sys.stdin.read(2048)
if not buf:
break
sys.stdout.write(buf)
print(boundary)
if __name__ == "__main__":
wrappager(sys.argv[1])

View File

@ -1,26 +1,59 @@
from functools import partial
from itertools import product
from pgcli.packages.parseutils.meta import FunctionMetadata, ForeignKey
from prompt_toolkit.completion import Completion
from functools import partial
from prompt_toolkit.document import Document
from mock import Mock
import pytest
parametrize = pytest.mark.parametrize
qual = ['if_more_than_one_table', 'always']
no_qual = ['if_more_than_one_table', 'never']
def escape(name):
if not name.islower() or name in ('select', 'insert'):
return '"' + name + '"'
return name
escape = lambda name: ('"' + name + '"' if not name.islower() or name in (
'select', 'insert') else name)
def completion(display_meta, text, pos=0):
return Completion(text, start_position=pos,
display_meta=display_meta)
return Completion(text, start_position=pos, display_meta=display_meta)
def get_result(completer, text, position=None):
position = len(text) if position is None else position
return completer.get_completions(
Document(text=text, cursor_position=position), Mock()
)
def result_set(completer, text, position=None):
return set(get_result(completer, text, position))
# The code below is quivalent to
# def schema(text, pos=0):
# return completion('schema', text, pos)
# and so on
schema, table, view, function, column, keyword, datatype, alias, name_join,\
fk_join, join = [partial(completion, display_meta)
for display_meta in('schema', 'table', 'view', 'function', 'column',
'keyword', 'datatype', 'table alias', 'name join', 'fk join', 'join')]
schema = partial(completion, 'schema')
table = partial(completion, 'table')
view = partial(completion, 'view')
function = partial(completion, 'function')
column = partial(completion, 'column')
keyword = partial(completion, 'keyword')
datatype = partial(completion, 'datatype')
alias = partial(completion, 'table alias')
name_join = partial(completion, 'name join')
fk_join = partial(completion, 'fk join')
join = partial(completion, 'join')
def wildcard_expansion(cols, pos=-1):
return Completion(cols, start_position=pos, display_meta='columns',
display = '*')
return Completion(
cols, start_position=pos, display_meta='columns', display='*')
class MetaData(object):
def __init__(self, metadata):
@ -33,40 +66,109 @@ class MetaData(object):
return [datatype(dt, pos) for dt in self.completer.datatypes]
def keywords(self, pos=0):
return [keyword(kw, pos) for kw in self.completer.keywords]
return [keyword(kw, pos) for kw in self.completer.keywords_tree.keys()]
def columns(self, parent, schema='public', typ='tables', pos=0):
def columns(self, tbl, parent='public', typ='tables', pos=0):
if typ == 'functions':
fun = [x for x in self.metadata[typ][schema] if x[0] == parent][0]
fun = [x for x in self.metadata[typ][parent] if x[0] == tbl][0]
cols = fun[1]
else:
cols = self.metadata[typ][schema][parent]
cols = self.metadata[typ][parent][tbl]
return [column(escape(col), pos) for col in cols]
def datatypes(self, schema='public', pos=0):
return [datatype(escape(x), pos)
for x in self.metadata.get('datatypes', {}).get(schema, [])]
def datatypes(self, parent='public', pos=0):
return [
datatype(escape(x), pos)
for x in self.metadata.get('datatypes', {}).get(parent, [])]
def tables(self, schema='public', pos=0):
return [table(escape(x), pos)
for x in self.metadata.get('tables', {}).get(schema, [])]
def tables(self, parent='public', pos=0):
return [
table(escape(x), pos)
for x in self.metadata.get('tables', {}).get(parent, [])]
def views(self, schema='public', pos=0):
return [view(escape(x), pos)
for x in self.metadata.get('views', {}).get(schema, [])]
def views(self, parent='public', pos=0):
return [
view(escape(x), pos)
for x in self.metadata.get('views', {}).get(parent, [])]
def functions(self, schema='public', pos=0):
return [function(escape(x[0] + '()'), pos)
for x in self.metadata.get('functions', {}).get(schema, [])]
def functions(self, parent='public', pos=0):
return [
function(escape(x[0] + '()'), pos)
for x in self.metadata.get('functions', {}).get(parent, [])]
def schemas(self, pos=0):
schemas = set(sch for schs in self.metadata.values() for sch in schs)
return [schema(escape(s), pos=pos) for s in schemas]
def functions_and_keywords(self, parent='public', pos=0):
return (
self.functions(parent, pos) + self.builtin_functions(pos) +
self.keywords(pos)
)
# Note that the filtering parameters here only apply to the columns
def columns_functions_and_keywords(
self, tbl, parent='public', typ='tables', pos=0
):
return (
self.functions_and_keywords(pos=pos) +
self.columns(tbl, parent, typ, pos)
)
def from_clause_items(self, parent='public', pos=0):
return (
self.functions(parent, pos) + self.views(parent, pos) +
self.tables(parent, pos)
)
def schemas_and_from_clause_items(self, parent='public', pos=0):
return self.from_clause_items(parent, pos) + self.schemas(pos)
def types(self, parent='public', pos=0):
return self.datatypes(parent, pos) + self.tables(parent, pos)
@property
def completer(self):
return self.get_completer()
def get_completers(self, casing):
"""
Returns a function taking three bools `casing`, `filtr`, `aliasing` and
the list `qualify`, all defaulting to None.
Returns a list of completers.
These parameters specify the allowed values for the corresponding
completer parameters, `None` meaning any, i.e. (None, None, None, None)
results in all 24 possible completers, whereas e.g.
(True, False, True, ['never']) results in the one completer with
casing, without `search_path` filtering of objects, with table
aliasing, and without column qualification.
"""
def _cfg(_casing, filtr, aliasing, qualify):
cfg = {'settings': {}}
if _casing:
cfg['casing'] = casing
cfg['settings']['search_path_filter'] = filtr
cfg['settings']['generate_aliases'] = aliasing
cfg['settings']['qualify_columns'] = qualify
return cfg
def _cfgs(casing, filtr, aliasing, qualify):
casings = [True, False] if casing is None else [casing]
filtrs = [True, False] if filtr is None else [filtr]
aliases = [True, False] if aliasing is None else [aliasing]
qualifys = qualify or ['always', 'if_more_than_one_table', 'never']
return [
_cfg(*p) for p in product(casings, filtrs, aliases, qualifys)
]
def completers(casing=None, filtr=None, aliasing=None, qualify=None):
get_comp = self.get_completer
return [
get_comp(**c) for c in _cfgs(casing, filtr, aliasing, qualify)
]
return completers
def get_completer(self, settings=None, casing=None):
metadata = self.metadata
from pgcli.pgcompleter import PGCompleter
@ -74,29 +176,32 @@ class MetaData(object):
schemata, tables, tbl_cols, views, view_cols = [], [], [], [], []
for schema, tbls in metadata['tables'].items():
schemata.append(schema)
for sch, tbls in metadata['tables'].items():
schemata.append(sch)
for table, cols in tbls.items():
tables.append((schema, table))
for tbl, cols in tbls.items():
tables.append((sch, tbl))
# Let all columns be text columns
tbl_cols.extend([(schema, table, col, 'text') for col in cols])
tbl_cols.extend([(sch, tbl, col, 'text') for col in cols])
for schema, tbls in metadata.get('views', {}).items():
for view, cols in tbls.items():
views.append((schema, view))
for sch, tbls in metadata.get('views', {}).items():
for tbl, cols in tbls.items():
views.append((sch, tbl))
# Let all columns be text columns
view_cols.extend([(schema, view, col, 'text') for col in cols])
view_cols.extend([(sch, tbl, col, 'text') for col in cols])
functions = [FunctionMetadata(schema, *func_meta)
for schema, funcs in metadata['functions'].items()
for func_meta in funcs]
functions = [
FunctionMetadata(sch, *func_meta)
for sch, funcs in metadata['functions'].items()
for func_meta in funcs]
datatypes = [(schema, datatype)
for schema, datatypes in metadata['datatypes'].items()
for datatype in datatypes]
datatypes = [
(sch, typ)
for sch, datatypes in metadata['datatypes'].items()
for typ in datatypes]
foreignkeys = [ForeignKey(*fk) for fks in metadata['foreignkeys'].values()
foreignkeys = [
ForeignKey(*fk) for fks in metadata['foreignkeys'].values()
for fk in fks]
comp.extend_schemata(schemata)

View File

@ -8,7 +8,9 @@ try:
except ImportError:
setproctitle = None
from pgcli.main import obfuscate_process_password, format_output, PGCli
from pgcli.main import (
obfuscate_process_password, format_output, PGCli, OutputSettings
)
from utils import dbtest, run
@ -47,21 +49,20 @@ def test_obfuscate_process_password():
def test_format_output():
settings = OutputSettings(table_format='psql', dcmlfmt='d', floatfmt='g')
results = format_output('Title', [('abc', 'def')], ['head1', 'head2'],
'test status', 'psql', dcmlfmt='d', floatfmt='g',)
'test status', settings)
expected = ['Title', '+---------+---------+\n| head1 | head2 |\n|---------+---------|\n| abc | def |\n+---------+---------+', 'test status']
assert results == expected
def test_format_output_auto_expand():
settings = OutputSettings(table_format='psql', dcmlfmt='d', floatfmt='g', max_width=100)
table_results = format_output('Title', [('abc', 'def')],
['head1', 'head2'], 'test status', 'psql', dcmlfmt='d', floatfmt='g',
max_width=100)
['head1', 'head2'], 'test status', settings)
table = ['Title', '+---------+---------+\n| head1 | head2 |\n|---------+---------|\n| abc | def |\n+---------+---------+', 'test status']
assert table_results == table
expanded_results = format_output('Title', [('abc', 'def')],
['head1', 'head2'], 'test status', 'psql', dcmlfmt='d', floatfmt='g',
max_width=1)
['head1', 'head2'], 'test status', settings._replace(max_width=1))
expanded = ['Title', u'-[ RECORD 0 ]-------------------------\nhead1 | abc\nhead2 | def\n', 'test status']
assert expanded_results == expanded

View File

@ -56,3 +56,18 @@ def test_paths_completion(completer, complete_event):
complete_event,
smart_completion=True))
assert result > set([Completion(text="setup.py", start_position=0)])
def test_alter_well_known_keywords_completion(completer, complete_event):
text = 'ALTER '
position = len(text)
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event,
smart_completion=True))
assert result > set([
Completion(text="DATABASE", display_meta='keyword'),
Completion(text="TABLE", display_meta='keyword'),
Completion(text="SYSTEM", display_meta='keyword'),
])
assert Completion(text="CREATE", display_meta="keyword") not in result

View File

@ -1,9 +1,8 @@
from __future__ import unicode_literals
import pytest
import itertools
from metadata import (MetaData, alias, name_join, fk_join, join,
schema, table, function, wildcard_expansion, column)
from prompt_toolkit.document import Document
schema, table, function, wildcard_expansion, column,
get_result, result_set, qual, no_qual, parametrize)
metadata = {
'tables': {
@ -63,310 +62,225 @@ metadata = {
testdata = MetaData(metadata)
cased_schemas = [schema(x) for x in ('public', 'blog', 'CUSTOM', '"Custom"')]
@pytest.fixture
def completer():
return testdata.completer
casing = ('SELECT', 'Orders', 'User_Emails', 'CUSTOM', 'Func1', 'Entries',
'Tags', 'EntryTags', 'EntAccLog',
'EntryID', 'EntryTitle', 'EntryText')
completers = testdata.get_completers(casing)
@pytest.fixture
def completer_with_casing():
return testdata.get_completer(casing=casing)
@pytest.fixture
def completer_with_aliases():
return testdata.get_completer({'generate_aliases': True})
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
@parametrize('table', ['users', '"users"'])
def test_suggested_column_names_from_shadowed_visible_table(completer, table) :
result = result_set(completer, 'SELECT FROM ' + table, len('SELECT '))
assert result == set(testdata.columns_functions_and_keywords('users'))
@pytest.fixture
def completer_aliases_casing(request):
return testdata.get_completer({'generate_aliases': True}, casing)
@pytest.fixture
def complete_event():
from mock import Mock
return Mock()
@pytest.mark.parametrize('table', [
'users',
'"users"',
])
def test_suggested_column_names_from_shadowed_visible_table(completer, complete_event, table):
"""
Suggest column and function names when selecting from table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT FROM ' + table
position = len('SELECT ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.columns('users') +
testdata.functions() +
list(testdata.builtin_functions() +
testdata.keywords())
)
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
@parametrize('text', [
'SELECT from custom.users',
'WITH users as (SELECT 1 AS foo) SELECT from custom.users',
])
def test_suggested_column_names_from_qualified_shadowed_table(completer, complete_event, text):
position = text.find(' ') + 1
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.columns('users', 'custom') +
testdata.functions() +
list(testdata.builtin_functions() +
testdata.keywords())
)
])
def test_suggested_column_names_from_qualified_shadowed_table(completer, text):
result = result_set(completer, text, position = text.find(' ') + 1)
assert result == set(testdata.columns_functions_and_keywords(
'users', 'custom'
))
@pytest.mark.parametrize('text', [
'WITH users as (SELECT 1 AS foo) SELECT from users',
])
def test_suggested_column_names_from_cte(completer, complete_event, text):
position = text.find(' ') + 1
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set([column('foo')] + testdata.functions() +
list(testdata.builtin_functions() + testdata.keywords())
)
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
@parametrize('text', ['WITH users as (SELECT 1 AS foo) SELECT from users',])
def test_suggested_column_names_from_cte(completer, text):
result = result_set(completer, text, text.find(' ') + 1)
assert result == set([column('foo')] + testdata.functions_and_keywords())
@parametrize('completer', completers(casing=False))
@parametrize('text', [
'SELECT * FROM users JOIN custom.shipments ON ',
'''SELECT *
FROM public.users
JOIN custom.shipments ON '''
])
def test_suggested_join_conditions(completer, complete_event, text):
position = len(text)
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set([
def test_suggested_join_conditions(completer, text):
result = result_set(completer, text)
assert result == set([
alias('users'),
alias('shipments'),
name_join('shipments.id = users.id'),
fk_join('shipments.user_id = users.id')])
@pytest.mark.parametrize(('query', 'tbl'), itertools.product((
@parametrize('completer', completers(filtr=True, casing=False, aliasing=False))
@parametrize(('query', 'tbl'), itertools.product((
'SELECT * FROM public.{0} RIGHT OUTER JOIN ',
'''SELECT *
FROM {0}
JOIN '''
), ('users', '"users"', 'Users')))
def test_suggested_joins(completer, complete_event, query, tbl):
text = query.format(tbl)
position = len(text)
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.schemas() + testdata.tables() + [
join('custom.shipments ON shipments.user_id = {0}.id'.format(tbl)),
] + testdata.functions())
def test_suggested_column_names_from_schema_qualifed_table(completer, complete_event):
"""
Suggest column and function names when selecting from a qualified-table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT from custom.products'
position = len('SELECT ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position), complete_event))
assert set(result) == set(testdata.columns('products', 'custom') + testdata.functions() +
list(testdata.builtin_functions() +
testdata.keywords())
)
def test_suggested_column_names_in_function(completer, complete_event):
"""
Suggest column and function names when selecting multiple
columns from table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT MAX( from custom.products'
position = len('SELECT MAX(')
result = completer.get_completions(
Document(text=text, cursor_position=position),
complete_event)
assert set(result) == set(testdata.columns('products', 'custom'))
def test_suggested_joins(completer, query, tbl):
result = result_set(completer, query.format(tbl))
assert result == set(
testdata.schemas_and_from_clause_items() +
[join('custom.shipments ON shipments.user_id = {0}.id'.format(tbl))]
)
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
def test_suggested_column_names_from_schema_qualifed_table(completer):
result = result_set(
completer, 'SELECT from custom.products', len('SELECT ')
)
assert result == set(testdata.columns_functions_and_keywords(
'products', 'custom'
))
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
def test_suggested_column_names_in_function(completer):
result = result_set(
completer, 'SELECT MAX( from custom.products', len('SELECT MAX(')
)
assert result == set(testdata.columns('products', 'custom'))
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', [
'SELECT * FROM Custom.',
'SELECT * FROM custom.',
'SELECT * FROM "custom".',
])
@pytest.mark.parametrize('use_leading_double_quote', [False, True])
def test_suggested_table_names_with_schema_dot(completer, complete_event,
text, use_leading_double_quote):
@parametrize('use_leading_double_quote', [False, True])
def test_suggested_table_names_with_schema_dot(
completer, text, use_leading_double_quote
):
if use_leading_double_quote:
text += '"'
start_pos = -1
start_position = -1
else:
start_pos = 0
start_position = 0
position = len(text)
result = completer.get_completions(
Document(text=text, cursor_position=position), complete_event)
assert set(result) == set(testdata.tables('custom', start_pos)
+ testdata.functions('custom', start_pos))
result = result_set(completer, text)
assert result == set(testdata.from_clause_items('custom', start_position))
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(casing=False, aliasing=False))
@parametrize('text', [
'SELECT * FROM "Custom".',
])
@pytest.mark.parametrize('use_leading_double_quote', [False, True])
def test_suggested_table_names_with_schema_dot2(completer, complete_event,
text, use_leading_double_quote):
@parametrize('use_leading_double_quote', [False, True])
def test_suggested_table_names_with_schema_dot2(
completer, text, use_leading_double_quote
):
if use_leading_double_quote:
text += '"'
start_pos = -1
start_position = -1
else:
start_pos = 0
start_position = 0
position = len(text)
result = completer.get_completions(
Document(text=text, cursor_position=position), complete_event)
assert set(result) == set(testdata.functions('Custom', start_pos) +
testdata.tables('Custom', start_pos))
result = result_set(completer, text)
assert result == set(testdata.from_clause_items('Custom', start_position))
def test_suggested_column_names_with_qualified_alias(completer, complete_event):
"""
Suggest column names on table alias and dot
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT p. from custom.products p'
position = len('SELECT p.')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.columns('products', 'custom'))
def test_suggested_multiple_column_names(completer, complete_event):
"""
Suggest column and function names when selecting multiple
columns from table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT id, from custom.products'
position = len('SELECT id, ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.columns('products', 'custom') +
testdata.functions() +
list(testdata.builtin_functions() +
testdata.keywords())
)
@parametrize('completer', completers(filtr=True, casing=False))
def test_suggested_column_names_with_qualified_alias(completer):
result = result_set(
completer, 'SELECT p. from custom.products p', len('SELECT p.')
)
assert result == set(testdata.columns('products', 'custom'))
def test_suggested_multiple_column_names_with_alias(completer, complete_event):
"""
Suggest column names on table alias and dot
when selecting multiple columns from table
:param completer:
:param complete_event:
:return:
"""
text = 'SELECT p.id, p. from custom.products p'
position = len('SELECT u.id, u.')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.columns('products', 'custom'))
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False, qualify=no_qual))
def test_suggested_multiple_column_names(completer):
result = result_set(
completer, 'SELECT id, from custom.products', len('SELECT id, ')
)
assert result == set(testdata.columns_functions_and_keywords(
'products', 'custom'
))
@parametrize('completer', completers(filtr=True, casing=False))
def test_suggested_multiple_column_names_with_alias(completer):
result = result_set(
completer,
'SELECT p.id, p. from custom.products p',
len('SELECT u.id, u.')
)
assert result == set(testdata.columns('products', 'custom'))
@parametrize('completer', completers(filtr=True, casing=False))
@parametrize('text', [
'SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON ',
'SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON JOIN public.orders z ON z.id > y.id'
])
def test_suggestions_after_on(completer, complete_event, text):
def test_suggestions_after_on(completer, text):
position = len('SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set([
result = result_set(completer, text, position)
assert result == set([
alias('x'),
alias('y'),
name_join('y.price = x.price'),
name_join('y.product_name = x.product_name'),
name_join('y.id = x.id')])
def test_suggested_aliases_after_on_right_side(completer, complete_event):
@parametrize('completer', completers())
def test_suggested_aliases_after_on_right_side(completer):
text = 'SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON x.id = '
position = len(text)
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set([
alias('x'),
alias('y')])
result = result_set(completer, text)
assert result == set([alias('x'), alias('y')])
def test_table_names_after_from(completer, complete_event):
@parametrize('completer', completers(filtr=True, casing=False, aliasing=False))
def test_table_names_after_from(completer):
text = 'SELECT * FROM '
position = len('SELECT * FROM ')
result = set(completer.get_completions(
Document(text=text, cursor_position=position),
complete_event))
assert set(result) == set(testdata.schemas() + testdata.tables()
+ testdata.functions())
result = result_set(completer, text)
assert result == set(testdata.schemas_and_from_clause_items())
def test_schema_qualified_function_name(completer, complete_event):
@parametrize('completer', completers(filtr=True, casing=False))
def test_schema_qualified_function_name(completer):
text = 'SELECT custom.func'
postion = len(text)
result = set(completer.get_completions(
Document(text=text, cursor_position=postion), complete_event))
result = result_set(completer, text)
assert result == set([
function('func3()', -len('func')),
function('set_returning_func()', -len('func'))])
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False))
@parametrize('text', [
'SELECT 1::custom.',
'CREATE TABLE foo (bar custom.',
'CREATE FUNCTION foo (bar INT, baz custom.',
'ALTER TABLE foo ALTER COLUMN bar TYPE custom.',
])
def test_schema_qualified_type_name(text, completer, complete_event):
pos = len(text)
result = completer.get_completions(
Document(text=text, cursor_position=pos), complete_event)
assert set(result) == set(testdata.datatypes('custom')
+ testdata.tables('custom'))
def test_schema_qualified_type_name(completer, text):
result = result_set(completer, text)
assert result == set(testdata.types('custom'))
def test_suggest_columns_from_aliased_set_returning_function(completer, complete_event):
sql = 'select f. from custom.set_returning_func() f'
pos = len('select f.')
result = completer.get_completions(Document(text=sql, cursor_position=pos),
complete_event)
assert set(result) == set(
@parametrize('completer', completers(filtr=True, casing=False))
def test_suggest_columns_from_aliased_set_returning_function(completer):
result = result_set(
completer,
'select f. from custom.set_returning_func() f',
len('select f.')
)
assert result == set(
testdata.columns('set_returning_func', 'custom', 'functions'))
@pytest.mark.parametrize('text', [
@parametrize('completer',completers(filtr=True, casing=False, qualify=no_qual))
@parametrize('text', [
'SELECT * FROM custom.set_returning_func()',
'SELECT * FROM Custom.set_returning_func()',
'SELECT * FROM Custom.Set_Returning_Func()'
])
def test_wildcard_column_expansion_with_function(completer, complete_event, text):
pos = len('SELECT *')
def test_wildcard_column_expansion_with_function(completer, text):
position = len('SELECT *')
completions = completer.get_completions(
Document(text=text, cursor_position=pos), complete_event)
completions = get_result(completer, text, position)
col_list = 'x'
expected = [wildcard_expansion(col_list)]
@ -374,19 +288,21 @@ def test_wildcard_column_expansion_with_function(completer, complete_event, text
assert expected == completions
def test_wildcard_column_expansion_with_alias_qualifier(completer, complete_event):
sql = 'SELECT p.* FROM custom.products p'
pos = len('SELECT p.*')
@parametrize('completer', completers(filtr=True, casing=False))
def test_wildcard_column_expansion_with_alias_qualifier(completer):
text = 'SELECT p.* FROM custom.products p'
position = len('SELECT p.*')
completions = completer.get_completions(
Document(text=sql, cursor_position=pos), complete_event)
completions = get_result(completer, text, position)
col_list = 'id, p.product_name, p.price'
expected = [wildcard_expansion(col_list)]
assert expected == completions
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False))
@parametrize('text', [
'''
SELECT count(1) FROM users;
CREATE FUNCTION foo(custom.products _products) returns custom.shipments
@ -415,32 +331,33 @@ def test_wildcard_column_expansion_with_alias_qualifier(completer, complete_even
'INSERT INTO orders (*)',
'INSERT INTO Orders (*)'
])
def test_wildcard_column_expansion_with_insert(completer, complete_event, text):
pos = text.index('*') + 1
completions = completer.get_completions(
Document(text=text, cursor_position=pos), complete_event)
def test_wildcard_column_expansion_with_insert(completer, text):
position = text.index('*') + 1
completions = get_result(completer, text, position)
expected = [wildcard_expansion('id, ordered_date, status')]
assert expected == completions
def test_wildcard_column_expansion_with_table_qualifier(completer, complete_event):
sql = 'SELECT "select".* FROM public."select"'
pos = len('SELECT "select".*')
completions = completer.get_completions(
Document(text=sql, cursor_position=pos), complete_event)
@parametrize('completer', completers(filtr=True, casing=False))
def test_wildcard_column_expansion_with_table_qualifier(completer):
text = 'SELECT "select".* FROM public."select"'
position = len('SELECT "select".*')
completions = get_result(completer, text, position)
col_list = 'id, "select"."insert", "select"."ABC"'
expected = [wildcard_expansion(col_list)]
assert expected == completions
def test_wildcard_column_expansion_with_two_tables(completer, complete_event):
sql = 'SELECT * FROM public."select" JOIN custom.users ON true'
pos = len('SELECT *')
completions = completer.get_completions(
Document(text=sql, cursor_position=pos), complete_event)
@parametrize('completer',completers(filtr=True, casing=False, qualify=qual))
def test_wildcard_column_expansion_with_two_tables(completer):
text = 'SELECT * FROM public."select" JOIN custom.users ON true'
position = len('SELECT *')
completions = get_result(completer, text, position)
cols = ('"select".id, "select"."insert", "select"."ABC", '
'users.id, users.phone_number')
@ -448,19 +365,21 @@ def test_wildcard_column_expansion_with_two_tables(completer, complete_event):
assert completions == expected
def test_wildcard_column_expansion_with_two_tables_and_parent(completer, complete_event):
sql = 'SELECT "select".* FROM public."select" JOIN custom.users u ON true'
pos = len('SELECT "select".*')
@parametrize('completer', completers(filtr=True, casing=False))
def test_wildcard_column_expansion_with_two_tables_and_parent(completer):
text = 'SELECT "select".* FROM public."select" JOIN custom.users u ON true'
position = len('SELECT "select".*')
completions = completer.get_completions(
Document(text=sql, cursor_position=pos), complete_event)
completions = get_result(completer, text, position)
col_list = 'id, "select"."insert", "select"."ABC"'
expected = [wildcard_expansion(col_list)]
assert expected == completions
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False))
@parametrize('text', [
'SELECT U. FROM custom.Users U',
'SELECT U. FROM custom.USERS U',
'SELECT U. FROM custom.users U',
@ -468,144 +387,191 @@ def test_wildcard_column_expansion_with_two_tables_and_parent(completer, complet
'SELECT U. FROM "custom".USERS U',
'SELECT U. FROM "custom".users U'
])
def test_suggest_columns_from_unquoted_table(completer, complete_event, text):
pos = len('SELECT U.')
result = completer.get_completions(Document(text=text, cursor_position=pos),
complete_event)
assert set(result) == set(testdata.columns('users', 'custom'))
def test_suggest_columns_from_unquoted_table(completer, text):
position = len('SELECT U.')
result = result_set(completer, text, position)
assert result == set(testdata.columns('users', 'custom'))
@pytest.mark.parametrize('text', [
@parametrize('completer', completers(filtr=True, casing=False))
@parametrize('text', [
'SELECT U. FROM custom."Users" U',
'SELECT U. FROM "custom"."Users" U'
])
def test_suggest_columns_from_quoted_table(completer, complete_event, text):
pos = len('SELECT U.')
result = completer.get_completions(Document(text=text, cursor_position=pos),
complete_event)
assert set(result) == set(testdata.columns('Users', 'custom'))
def test_suggest_columns_from_quoted_table(completer, text):
position = len('SELECT U.')
result = result_set(completer, text, position)
assert result == set(testdata.columns('Users', 'custom'))
texts = ['SELECT * FROM ', 'SELECT * FROM public.Orders O CROSS JOIN ']
@pytest.mark.parametrize('text', texts)
def test_schema_or_visible_table_completion(completer, complete_event, text):
result = completer.get_completions(Document(text=text), complete_event)
assert set(result) == set(testdata.schemas()
+ testdata.views() + testdata.tables() + testdata.functions())
result = completer.get_completions(Document(text=text), complete_event)
@pytest.mark.parametrize('text', texts)
def test_table_aliases(completer_with_aliases, complete_event, text):
result = completer_with_aliases.get_completions(
Document(text=text), complete_event)
assert set(result) == set(testdata.schemas() + [
@parametrize('completer', completers(filtr=True, casing=False, aliasing=False))
@parametrize('text', texts)
def test_schema_or_visible_table_completion(completer, text):
result = result_set(completer, text)
assert result == set(testdata.schemas_and_from_clause_items())
@parametrize('completer', completers(aliasing=True, casing=False, filtr=True))
@parametrize('text', texts)
def test_table_aliases(completer, text):
result = result_set(completer, text)
assert result == set(testdata.schemas() + [
table('users u'),
table('orders o' if text == 'SELECT * FROM ' else 'orders o2'),
table('"select" s'),
function('func1() f'),
function('func2() f')])
@pytest.mark.parametrize('text', texts)
def test_aliases_with_casing(completer_aliases_casing, complete_event, text):
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
assert set(result) == set(cased_schemas + [
@parametrize('completer', completers(aliasing=True, casing=True, filtr=True))
@parametrize('text', texts)
def test_aliases_with_casing(completer, text):
result = result_set(completer, text)
assert result == set(cased_schemas + [
table('users u'),
table('Orders O' if text == 'SELECT * FROM ' else 'Orders O2'),
table('"select" s'),
function('Func1() F'),
function('func2() f')])
@pytest.mark.parametrize('text', texts)
def test_table_casing(completer_with_casing, complete_event, text):
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
assert set(result) == set(cased_schemas + [
@parametrize('completer', completers(aliasing=False, casing=True, filtr=True))
@parametrize('text', texts)
def test_table_casing(completer, text):
result = result_set(completer, text)
assert result == set(cased_schemas + [
table('users'),
table('Orders'),
table('"select"'),
function('Func1()'),
function('func2()')])
def test_alias_search_without_aliases2(completer_with_casing, complete_event):
@parametrize('completer', completers(aliasing=False, casing=True))
def test_alias_search_without_aliases2(completer):
text = 'SELECT * FROM blog.et'
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == table('EntryTags', -2)
def test_alias_search_without_aliases1(completer_with_casing, complete_event):
@parametrize('completer', completers(aliasing=False, casing=True))
def test_alias_search_without_aliases1(completer):
text = 'SELECT * FROM blog.e'
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == table('Entries', -1)
def test_alias_search_with_aliases2(completer_aliases_casing, complete_event):
@parametrize('completer', completers(aliasing=True, casing=True))
def test_alias_search_with_aliases2(completer):
text = 'SELECT * FROM blog.et'
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == table('EntryTags ET', -2)
def test_alias_search_with_aliases1(completer_aliases_casing, complete_event):
@parametrize('completer', completers(aliasing=True, casing=True))
def test_alias_search_with_aliases1(completer):
text = 'SELECT * FROM blog.e'
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == table('Entries E', -1)
def test_join_alias_search_with_aliases1(completer_aliases_casing,
complete_event):
@parametrize('completer', completers(aliasing=True, casing=True))
def test_join_alias_search_with_aliases1(completer):
text = 'SELECT * FROM blog.Entries E JOIN blog.e'
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[:2] == [table('Entries E2', -1), join(
'EntAccLog EAL ON EAL.EntryID = E.EntryID', -1)]
def test_join_alias_search_without_aliases1(completer_with_casing,
complete_event):
@parametrize('completer', completers(aliasing=False, casing=True))
def test_join_alias_search_without_aliases1(completer):
text = 'SELECT * FROM blog.Entries JOIN blog.e'
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[:2] == [table('Entries', -1), join(
'EntAccLog ON EntAccLog.EntryID = Entries.EntryID', -1)]
def test_join_alias_search_with_aliases2(completer_aliases_casing,
complete_event):
@parametrize('completer', completers(aliasing=True, casing=True))
def test_join_alias_search_with_aliases2(completer):
text = 'SELECT * FROM blog.Entries E JOIN blog.et'
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == join('EntryTags ET ON ET.EntryID = E.EntryID', -2)
def test_join_alias_search_without_aliases2(completer_with_casing,
complete_event):
@parametrize('completer', completers(aliasing=False, casing=True))
def test_join_alias_search_without_aliases2(completer):
text = 'SELECT * FROM blog.Entries JOIN blog.et'
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == join(
'EntryTags ON EntryTags.EntryID = Entries.EntryID', -2)
def test_function_alias_search_without_aliases(completer_with_casing,
complete_event):
@parametrize('completer', completers())
def test_function_alias_search_without_aliases(completer):
text = 'SELECT blog.ees'
result = completer_with_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == function('extract_entry_symbols()', -3)
def test_function_alias_search_with_aliases(completer_aliases_casing,
complete_event):
@parametrize('completer', completers())
def test_function_alias_search_with_aliases(completer):
text = 'SELECT blog.ee'
result = completer_aliases_casing.get_completions(
Document(text=text), complete_event)
result = get_result(completer, text)
assert result[0] == function('enter_entry()', -2)
def test_column_alias_search(completer_aliases_casing, complete_event):
text = 'SELECT et FROM blog.Entries E'
result = completer_aliases_casing.get_completions(
Document(text, cursor_position=len('SELECT et')), complete_event)
@parametrize('completer',completers(filtr=True, casing=True, qualify=no_qual))
def test_column_alias_search(completer):
result = get_result(
completer, 'SELECT et FROM blog.Entries E', len('SELECT et')
)
cols = ('EntryText', 'EntryTitle', 'EntryID')
assert result[:3] == [column(c, -2) for c in cols]
def test_column_alias_search_qualified(completer_aliases_casing,
complete_event):
text = 'SELECT E.ei FROM blog.Entries E'
result = completer_aliases_casing.get_completions(
Document(text, cursor_position=len('SELECT E.ei')), complete_event)
@parametrize('completer', completers(casing=True))
def test_column_alias_search_qualified(completer):
result = get_result(
completer, 'SELECT E.ei FROM blog.Entries E', len('SELECT E.ei')
)
cols = ('EntryID', 'EntryTitle')
assert result[:3] == [column(c, -2) for c in cols]
@parametrize('completer', completers(casing=False, filtr=False, aliasing=False))
def test_schema_object_order(completer):
result = get_result(completer, 'SELECT * FROM u')
assert result[:3] == [
table(t, pos=-1) for t in ('users', 'custom."Users"', 'custom.users')
]
@parametrize('completer', completers(casing=False, filtr=False, aliasing=False))
def test_all_schema_objects(completer):
text = ('SELECT * FROM ')
result = result_set(completer, text)
assert result >= set(
[table(x) for x in ('orders', '"select"', 'custom.shipments')]
+ [function(x+'()') for x in ('func2', 'custom.func3')]
)
@parametrize('completer', completers(filtr=False, aliasing=False, casing=True))
def test_all_schema_objects_with_casing(completer):
text = 'SELECT * FROM '
result = result_set(completer, text)
assert result >= set(
[table(x) for x in ('Orders', '"select"', 'CUSTOM.shipments')]
+ [function(x+'()') for x in ('func2', 'CUSTOM.func3')]
)
@parametrize('completer', completers(casing=False, filtr=False, aliasing=True))
def test_all_schema_objects_with_aliases(completer):
text = ('SELECT * FROM ')
result = result_set(completer, text)
assert result >= set(
[table(x) for x in ('orders o', '"select" s', 'custom.shipments s')]
+ [function(x) for x in ('func2() f', 'custom.func3() f')]
)

File diff suppressed because it is too large Load Diff

View File

@ -4,22 +4,26 @@ from pgcli.packages.sqlcompletion import (
from pgcli.packages.parseutils.tables import TableReference
import pytest
# Returns the expected select-clause suggestions for a single-table select
def cols_etc(table, schema=None, alias=None, is_function=False, parent=None):
def cols_etc(table, schema=None, alias=None, is_function=False, parent=None,
last_keyword=None):
"""Returns the expected select-clause suggestions for a single-table
select."""
return set([
Column(table_refs=(TableReference(schema, table, alias, is_function),),
qualifiable=True),
Function(schema=parent),
Keyword()])
Keyword(last_keyword)])
def test_select_suggests_cols_with_visible_table_scope():
suggestions = suggest_type('SELECT FROM tabl', 'SELECT ')
assert set(suggestions) == cols_etc('tabl')
assert set(suggestions) == cols_etc('tabl', last_keyword='SELECT')
def test_select_suggests_cols_with_qualified_table_scope():
suggestions = suggest_type('SELECT FROM sch.tabl', 'SELECT ')
assert set(suggestions) == cols_etc('tabl', 'sch')
assert set(suggestions) == cols_etc('tabl', 'sch', last_keyword='SELECT')
def test_cte_does_not_crash():
@ -32,8 +36,9 @@ def test_cte_does_not_crash():
'SELECT * FROM "tabl" WHERE ',
])
def test_where_suggests_columns_functions_quoted_table(expression):
expected = cols_etc('tabl', alias='"tabl"', last_keyword='WHERE')
suggestions = suggest_type(expression, expression)
assert set(suggestions) == cols_etc('tabl', alias='"tabl"')
assert expected == set(suggestions)
@pytest.mark.parametrize('expression', [
@ -52,7 +57,7 @@ def test_where_suggests_columns_functions_quoted_table(expression):
])
def test_where_suggests_columns_functions(expression):
suggestions = suggest_type(expression, expression)
assert set(suggestions) == cols_etc('tabl')
assert set(suggestions) == cols_etc('tabl', last_keyword='WHERE')
@pytest.mark.parametrize('expression', [
@ -61,7 +66,7 @@ def test_where_suggests_columns_functions(expression):
])
def test_where_in_suggests_columns(expression):
suggestions = suggest_type(expression, expression)
assert set(suggestions) == cols_etc('tabl')
assert set(suggestions) == cols_etc('tabl', last_keyword='WHERE')
@pytest.mark.parametrize('expression', [
'SELECT 1 AS ',
@ -75,7 +80,7 @@ def test_after_as(expression):
def test_where_equals_any_suggests_columns_or_keywords():
text = 'SELECT * FROM tabl WHERE foo = ANY('
suggestions = suggest_type(text, text)
assert set(suggestions) == cols_etc('tabl')
assert set(suggestions) == cols_etc('tabl', last_keyword='WHERE')
def test_lparen_suggests_cols():
@ -87,9 +92,9 @@ def test_lparen_suggests_cols():
def test_select_suggests_cols_and_funcs():
suggestions = suggest_type('SELECT ', 'SELECT ')
assert set(suggestions) == set([
Column(table_refs=(), qualifiable=True),
Function(schema=None),
Keyword(),
Column(table_refs=(), qualifiable=True),
Function(schema=None),
Keyword('SELECT'),
])
@ -213,15 +218,71 @@ def test_truncate_suggests_qualified_tables():
])
def test_distinct_suggests_cols(text):
suggestions = suggest_type(text, text)
assert suggestions ==(Column(table_refs=(), qualifiable=True),)
assert set(suggestions) == set([
Column(table_refs=(), local_tables=(), qualifiable=True),
Function(schema=None),
Keyword('DISTINCT')
])
@pytest.mark.parametrize('text, text_before, last_keyword', [
(
'SELECT DISTINCT FROM tbl x JOIN tbl1 y',
'SELECT DISTINCT',
'SELECT',
),
(
'SELECT * FROM tbl x JOIN tbl1 y ORDER BY ',
'SELECT * FROM tbl x JOIN tbl1 y ORDER BY ',
'BY',
)
])
def test_distinct_and_order_by_suggestions_with_aliases(text, text_before,
last_keyword):
suggestions = suggest_type(text, text_before)
assert set(suggestions) == set([
Column(
table_refs=(
TableReference(None, 'tbl', 'x', False),
TableReference(None, 'tbl1', 'y', False),
),
local_tables=(),
qualifiable=True
),
Function(schema=None),
Keyword(last_keyword)
])
@pytest.mark.parametrize('text, text_before', [
(
'SELECT DISTINCT x. FROM tbl x JOIN tbl1 y',
'SELECT DISTINCT x.'
),
(
'SELECT * FROM tbl x JOIN tbl1 y ORDER BY x.',
'SELECT * FROM tbl x JOIN tbl1 y ORDER BY x.'
)
])
def test_distinct_and_order_by_suggestions_with_alias_given(text, text_before):
suggestions = suggest_type(text, text_before)
assert set(suggestions) == set([
Column(
table_refs=(TableReference(None, 'tbl', 'x', False),),
local_tables=(),
qualifiable=False
),
Table(schema='x'),
View(schema='x'),
Function(schema='x'),
])
def test_col_comma_suggests_cols():
suggestions = suggest_type('SELECT a, b, FROM tbl', 'SELECT a, b,')
assert set(suggestions) == set([
Column(table_refs=((None, 'tbl', None, False),), qualifiable=True),
Function(schema=None),
Keyword(),
Keyword('SELECT'),
])
@ -264,7 +325,7 @@ def test_insert_into_lparen_comma_suggests_cols():
def test_partially_typed_col_name_suggests_col_names():
suggestions = suggest_type('SELECT * FROM tabl WHERE col_n',
'SELECT * FROM tabl WHERE col_n')
assert set(suggestions) == cols_etc('tabl')
assert set(suggestions) == cols_etc('tabl', last_keyword='WHERE')
def test_dot_suggests_cols_of_a_table_or_schema_qualified_table():
@ -381,7 +442,7 @@ def test_sub_select_col_name_completion():
assert set(suggestions) == set([
Column(table_refs=((None, 'abc', None, False),), qualifiable=True),
Function(schema=None),
Keyword(),
Keyword('SELECT'),
])
@ -545,7 +606,7 @@ def test_2_statements_2nd_current():
assert set(suggestions) == set([
Column(table_refs=((None, 'b', None, False),), qualifiable=True),
Function(schema=None),
Keyword()
Keyword('SELECT')
])
# Should work even if first statement is invalid
@ -567,7 +628,7 @@ def test_2_statements_1st_current():
suggestions = suggest_type('select from a; select * from b',
'select ')
assert set(suggestions) == cols_etc('a')
assert set(suggestions) == cols_etc('a', last_keyword='SELECT')
def test_3_statements_2nd_current():
@ -580,7 +641,7 @@ def test_3_statements_2nd_current():
suggestions = suggest_type('select * from a; select from b; select * from c',
'select * from a; select ')
assert set(suggestions) == cols_etc('b')
assert set(suggestions) == cols_etc('b', last_keyword='SELECT')
@pytest.mark.parametrize('text', [
'''
@ -631,7 +692,7 @@ def test_statements_in_function_body(text):
assert set(suggestions) == set([
Column(table_refs=((None, 'foo', None, False),), qualifiable=True),
Function(schema=None),
Keyword()
Keyword('SELECT'),
])
functions = [
@ -655,12 +716,13 @@ SELECT 1 FROM foo;
@pytest.mark.parametrize('text', functions)
def test_statements_with_cursor_after_function_body(text):
suggestions = suggest_type(text, text[:text.find('; ') + 1])
assert set(suggestions) == set([Keyword()])
assert set(suggestions) == set([Keyword(), Special()])
@pytest.mark.parametrize('text', functions)
def test_statements_with_cursor_before_function_body(text):
suggestions = suggest_type(text, '')
assert set(suggestions) == set([Keyword()])
assert set(suggestions) == set([Keyword(), Special()])
def test_create_db_with_template():
suggestions = suggest_type('create database foo with template ',
@ -774,16 +836,16 @@ def test_invalid_sql():
def test_suggest_where_keyword(text):
# https://github.com/dbcli/mycli/issues/135
suggestions = suggest_type(text, text)
assert set(suggestions) == cols_etc('foo')
assert set(suggestions) == cols_etc('foo', last_keyword='WHERE')
@pytest.mark.parametrize('text, before, expected', [
('\\ns abc SELECT ', 'SELECT ', [
Column(table_refs=(), qualifiable=True),
Function(schema=None),
Keyword()
Keyword('SELECT')
]),
('\\ns abc SELECT foo ', 'SELECT foo ',(Keyword(),)),
('\\ns abc SELECT foo ', 'SELECT foo ', (Keyword(),)),
('\\ns abc SELECT t1. FROM tabl1 t1', 'SELECT t1.', [
Table(schema='t1'),
View(schema='t1'),
@ -799,7 +861,7 @@ def test_named_query_completion(text, before, expected):
def test_select_suggests_fields_from_function():
suggestions = suggest_type('SELECT FROM func()', 'SELECT ')
assert set(suggestions) == cols_etc(
'func', is_function=True)
'func', is_function=True, last_keyword='SELECT')
@pytest.mark.parametrize('sql', [
@ -846,4 +908,4 @@ def test_handle_unrecognized_kw_generously():
'ALTER TABLE foo ALTER ',
])
def test_keyword_after_alter(sql):
assert Keyword() in set(suggest_type(sql, sql))
assert Keyword('ALTER') in set(suggest_type(sql, sql))

View File

@ -1,7 +1,7 @@
import pytest
import psycopg2
import psycopg2.extras
from pgcli.main import format_output
from pgcli.main import format_output, OutputSettings
from pgcli.pgexecute import register_json_typecasters
# TODO: should this be somehow be divined from environment?
@ -64,10 +64,10 @@ def run(executor, sql, join=False, expanded=False, pgspecial=None,
results = executor.run(sql, pgspecial, exception_formatter)
formatted = []
settings = OutputSettings(table_format='psql', dcmlfmt='d', floatfmt='g',
expanded=expanded)
for title, rows, headers, status, sql, success in results:
formatted.extend(format_output(title, rows, headers, status, 'psql', dcmlfmt='d', floatfmt='g',
expanded=expanded))
formatted.extend(format_output(title, rows, headers, status, settings))
if join:
formatted = '\n'.join(formatted)