1
0
mirror of https://github.com/dbcli/pgcli synced 2024-05-31 01:17:54 +00:00

Merge branch 'master' into tab-on-line-start

This commit is contained in:
Artur Balabanov 2018-10-01 10:25:13 +01:00
commit f897ea466e
40 changed files with 547 additions and 352 deletions

3
.gitignore vendored
View File

@ -66,3 +66,6 @@ target/
# Generated Packages
*.deb
*.rpm
.vscode/
venv/

View File

@ -19,7 +19,7 @@ script:
- set -e
- coverage run --source pgcli -m py.test
- cd tests
- behave
- behave --no-capture
- cd ..
# check for changelog ReST compliance
- rst2html.py --halt=warning changelog.rst >/dev/null

View File

@ -84,6 +84,7 @@ Contributors:
* Saif Hakim
* Artur Balabanov
* Kenny Do
* Max Rothman
Creator:

View File

@ -1,15 +1,35 @@
Upcoming
========
Upcoming:
=========
Bug fixes:
----------
* Fix for error retrieving version in Redshift (#922). (Thanks: `Irina Truong`_)
* Adapt the query used to get functions metadata to PG11 (#919). (Thanks: `Lele Gaifax`_).
* Fix for keyring not disabled properly (#920). (Thanks: `Irina Truong`_)
* Tab press on an empty line increases the indentation instead of triggering
the auto-complete pop-up. (Thanks: `Artur Balabanov`_)
2.0.0:
======
* Update to ``prompt-toolkit`` 2.0. (Thanks: `Jonathan Slenders`_, `Dick Marinus`_, `Irina Truong`_)
1.11.0
======
Features:
---------
* Respect `\pset pager on` and use pager when output is longer than terminal height (Thanks: `Max Rothman`_)
1.10.3
======
Bug fixes:
----------
* Adapt the query used to get functions metadata to PG11 (#919). (Thanks: `Lele Gaifax`_).
* Fix for error retrieving version in Redshift (#922). (Thanks: `Irina Truong`_)
* Fix for keyring not disabled properly (#920). (Thanks: `Irina Truong`_)
1.10.2
======
@ -870,3 +890,4 @@ Improvements:
.. _`Saif Hakim`: https://github.com/saifelse
.. _`Artur Balabanov`: https://github.com/arturbalabanov
.. _`Kenny Do`: https://github.com/kennydo
.. _`Max Rothman`: https://github.com/maxrothman

View File

@ -1 +1 @@
__version__ = '1.10.2'
__version__ = '2.0.0'

View File

@ -81,7 +81,7 @@ class CompletionRefresher(object):
# Load history into pgcompleter so it can learn user preferences
n_recent = 100
if history:
for recent in history[-n_recent:]:
for recent in history.get_strings()[-n_recent:]:
completer.extend_query_history(recent, is_init=True)
for callback in callbacks:

View File

@ -1,13 +0,0 @@
from prompt_toolkit.filters import Filter
class HasSelectedCompletion(Filter):
"""Enable when the current buffer has a selected completion."""
def __call__(self, cli):
complete_state = cli.current_buffer.complete_state
return (complete_state is not None and
complete_state.current_completion is not None)
def __repr__(self):
return "HasSelectedCompletion()"

View File

@ -1,74 +1,57 @@
from __future__ import unicode_literals
import logging
from prompt_toolkit.enums import EditingMode
from prompt_toolkit.keys import Keys
from prompt_toolkit.key_binding.manager import KeyBindingManager
from prompt_toolkit.filters import Condition
from .filters import HasSelectedCompletion
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.filters import completion_is_selected
_logger = logging.getLogger(__name__)
def pgcli_bindings(get_vi_mode_enabled, set_vi_mode_enabled, expand_tab):
def pgcli_bindings(pgcli):
"""Custom key bindings for pgcli."""
kb = KeyBindings()
assert callable(get_vi_mode_enabled)
assert callable(set_vi_mode_enabled)
expand_tab = pgcli.config['main'].as_bool('expand_tab')
tab_insert_text = ' ' * 4 if expand_tab else '\t'
key_binding_manager = KeyBindingManager(
enable_open_in_editor=True,
enable_system_bindings=True,
enable_auto_suggest_bindings=True,
enable_search=True,
enable_abort_and_exit_bindings=True)
@key_binding_manager.registry.add_binding(Keys.F2)
@kb.add('f2')
def _(event):
"""
Enable/Disable SmartCompletion Mode.
"""
"""Enable/Disable SmartCompletion Mode."""
_logger.debug('Detected F2 key.')
buf = event.cli.current_buffer
buf.completer.smart_completion = not buf.completer.smart_completion
pgcli.completer.smart_completion = not pgcli.completer.smart_completion
@key_binding_manager.registry.add_binding(Keys.F3)
@kb.add('f3')
def _(event):
"""
Enable/Disable Multiline Mode.
"""
"""Enable/Disable Multiline Mode."""
_logger.debug('Detected F3 key.')
buf = event.cli.current_buffer
buf.always_multiline = not buf.always_multiline
pgcli.multi_line = not pgcli.multi_line
@key_binding_manager.registry.add_binding(Keys.F4)
@kb.add('f4')
def _(event):
"""
Toggle between Vi and Emacs mode.
"""
"""Toggle between Vi and Emacs mode."""
_logger.debug('Detected F4 key.')
vi_mode = not get_vi_mode_enabled()
set_vi_mode_enabled(vi_mode)
pgcli.vi_mode = not pgcli.vi_mode
event.app.editing_mode = EditingMode.VI if pgcli.vi_mode else EditingMode.EMACS
event.cli.editing_mode = EditingMode.VI if vi_mode else EditingMode.EMACS
@key_binding_manager.registry.add_binding(Keys.Tab)
@kb.add('tab')
def _(event):
"""Force autocompletion at cursor on non-empty lines."""
_logger.debug('Detected <Tab> key.')
buff = event.cli.current_buffer
buff = event.app.current_buffer
doc = buff.document
if doc.on_first_line or doc.current_line.strip():
if buff.complete_state:
buff.complete_next()
else:
event.cli.start_completion(select_first=True)
buff.start_completion(select_first=True)
else:
buff.insert_text(tab_insert_text, fire_event=False)
@key_binding_manager.registry.add_binding(Keys.ControlSpace)
@kb.add('c-space')
def _(event):
"""
Initialize autocompletion at cursor.
@ -80,21 +63,25 @@ def pgcli_bindings(get_vi_mode_enabled, set_vi_mode_enabled, expand_tab):
"""
_logger.debug('Detected <C-Space> key.')
b = event.cli.current_buffer
b = event.app.current_buffer
if b.complete_state:
b.complete_next()
else:
event.cli.start_completion(select_first=False)
b.start_completion(select_first=False)
@key_binding_manager.registry.add_binding(Keys.ControlJ, filter=HasSelectedCompletion())
@kb.add('enter', filter=completion_is_selected)
def _(event):
"""Makes the enter key work as the tab key only when showing the menu.
In other words, don't execute query when enter is pressed in
the completion dropdown menu, instead close the dropdown menu
(accept current selection).
"""
Makes the enter key work as the tab key only when showing the menu.
"""
_logger.debug('Detected <C-J> key.')
_logger.debug('Detected enter key.')
event.current_buffer.complete_state = None
b = event.cli.current_buffer
b = event.app.current_buffer
b.complete_state = None
return key_binding_manager
return kb

View File

@ -27,22 +27,20 @@ try:
import setproctitle
except ImportError:
setproctitle = None
from prompt_toolkit import CommandLineInterface, Application, AbortAction
from prompt_toolkit.completion import DynamicCompleter
from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode
from prompt_toolkit.shortcuts import create_prompt_layout, create_eventloop
from prompt_toolkit.buffer import AcceptAction
from prompt_toolkit.shortcuts import PromptSession, CompleteStyle
from prompt_toolkit.document import Document
from prompt_toolkit.filters import Always, HasFocus, IsDone
from prompt_toolkit.layout.lexers import PygmentsLexer
from prompt_toolkit.filters import HasFocus, IsDone
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.layout.processors import (ConditionalProcessor,
HighlightMatchingBracketProcessor,
TabsProcessor)
from prompt_toolkit.history import FileHistory
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from pygments.lexers.sql import PostgresLexer
from pygments.token import Token
from pgspecial.main import (PGSpecial, NO_QUERY, PAGER_OFF)
from pgspecial.main import (PGSpecial, NO_QUERY, PAGER_OFF, PAGER_LONG_OUTPUT)
import pgspecial as special
try:
import keyring
@ -52,7 +50,7 @@ from .pgcompleter import PGCompleter
from .pgtoolbar import create_toolbar_tokens_func
from .pgstyle import style_factory, style_factory_output
from .pgexecute import PGExecute
from .pgbuffer import PGBuffer
from .pgbuffer import pg_is_multiline
from .completion_refresher import CompletionRefresher
from .config import (get_casing_file,
load_config, config_location, ensure_dir_exists, get_config)
@ -77,6 +75,9 @@ from collections import namedtuple
from textwrap import dedent
# Ref: https://stackoverflow.com/questions/30425105/filter-special-chars-such-as-color-codes-from-shell-output
COLOR_CODE_REGEX = re.compile(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))')
# Query tuples are used for maintaining history
MetaQuery = namedtuple(
'Query',
@ -215,8 +216,7 @@ class PGCli(object):
self._completer_lock = threading.Lock()
self.register_special_commands()
self.eventloop = create_eventloop()
self.cli = None
self.prompt_app = None
def quit(self):
raise PgCliQuitError
@ -497,7 +497,7 @@ class PGCli(object):
self.pgexecute = pgexecute
def handle_editor_command(self, cli, document):
def handle_editor_command(self, text):
r"""
Editor command is any query that is prefixed or suffixed
by a '\e'. The reason for a while loop is because a user
@ -506,41 +506,39 @@ class PGCli(object):
"select * from \e"<enter> to edit it in vim, then come
back to the prompt with the edited query "select * from
blah where q = 'abc'\e" to edit it again.
:param cli: CommandLineInterface
:param document: Document
:param text: Document
:return: Document
"""
# FIXME: using application.pre_run_callables like this here is not the best solution.
# It's internal api of prompt_toolkit that may change. This was added to fix #668.
# We may find a better way to do it in the future.
saved_callables = cli.application.pre_run_callables
try:
editor_command = special.editor_command(document.text)
while editor_command:
if editor_command == '\\e':
filename = special.get_filename(document.text)
query = (special.get_editor_query(document.text) or
self.get_last_query())
else: # \ev or \ef
filename = None
spec = document.text.split()[1]
if editor_command == '\\ev':
query = self.pgexecute.view_definition(spec)
elif editor_command == '\\ef':
query = self.pgexecute.function_definition(spec)
sql, message = special.open_external_editor(
filename, sql=query)
if message:
# Something went wrong. Raise an exception and bail.
raise RuntimeError(message)
cli.current_buffer.document = Document(
sql, cursor_position=len(sql))
cli.application.pre_run_callables = []
document = cli.run()
editor_command = special.editor_command(document.text)
finally:
cli.application.pre_run_callables = saved_callables
return document
editor_command = special.editor_command(text)
while editor_command:
if editor_command == '\\e':
filename = special.get_filename(text)
query = special.get_editor_query(
text) or self.get_last_query()
else: # \ev or \ef
filename = None
spec = text.split()[1]
if editor_command == '\\ev':
query = self.pgexecute.view_definition(spec)
elif editor_command == '\\ef':
query = self.pgexecute.function_definition(spec)
sql, message = special.open_external_editor(
filename, sql=query)
if message:
# Something went wrong. Raise an exception and bail.
raise RuntimeError(message)
while True:
try:
text = self.prompt_app.prompt(
default=sql,
vi_mode=self.vi_mode
)
break
except KeyboardInterrupt:
sql = None
editor_command = special.editor_command(text)
return text
def execute_command(self, text):
logger = self.logger
@ -623,7 +621,7 @@ class PGCli(object):
self.refresh_completions(history=history,
persist_priorities='none')
self.cli = self._build_cli(history)
self.prompt_app = self._build_cli(history)
if not self.less_chatty:
print('Server: PostgreSQL', self.pgexecute.get_server_version())
@ -634,18 +632,21 @@ class PGCli(object):
try:
while True:
document = self.cli.run()
try:
text = self.prompt_app.prompt(vi_mode=self.vi_mode)
except KeyboardInterrupt:
continue
try:
document = self.handle_editor_command(self.cli, document)
text = self.handle_editor_command(text)
except RuntimeError as e:
logger.error("sql: %r, error: %r", document.text, e)
logger.error("sql: %r, error: %r", text, e)
logger.error("traceback: %r", traceback.format_exc())
click.secho(str(e), err=True, fg='red')
continue
self.watch_command, timing = special.get_watch_command(
document.text)
# Initialize default metaquery in case execution fails
self.watch_command, timing = special.get_watch_command(text)
if self.watch_command:
while self.watch_command:
try:
@ -657,13 +658,13 @@ class PGCli(object):
except KeyboardInterrupt:
self.watch_command = None
else:
query = self.execute_command(document.text)
query = self.execute_command(text)
self.now = dt.datetime.today()
# Allow PGCompleter to learn user's preferred keywords, etc.
with self._completer_lock:
self.completer.extend_query_history(document.text)
self.completer.extend_query_history(text)
self.query_history.append(query)
@ -672,16 +673,9 @@ class PGCli(object):
print ('Goodbye!')
def _build_cli(self, history):
key_bindings = pgcli_bindings(self)
def set_vi_mode(value):
self.vi_mode = value
key_binding_manager = pgcli_bindings(
get_vi_mode_enabled=lambda: self.vi_mode,
set_vi_mode_enabled=set_vi_mode,
expand_tab=self.config['main'].as_bool('expand_tab'))
def prompt_tokens(_):
def get_message():
if self.dsn_alias and self.prompt_dsn_format is not None:
prompt_format = self.prompt_dsn_format
else:
@ -693,68 +687,56 @@ class PGCli(object):
len(prompt) > self.max_len_prompt):
prompt = self.get_prompt('\\d> ')
return [(Token.Prompt, prompt)]
return [('class:prompt', prompt)]
def get_continuation_tokens(cli, width):
continuation=self.multiline_continuation_char * (width - 1) + ' '
return [(Token.Continuation, continuation)]
def get_continuation(width, line_number, is_soft_wrap):
continuation = self.multiline_continuation_char * (width - 1) + ' '
return [('class:continuation', continuation)]
get_toolbar_tokens = create_toolbar_tokens_func(
lambda: self.vi_mode, self.completion_refresher.is_refreshing,
self.pgexecute.failed_transaction,
self.pgexecute.valid_transaction)
get_toolbar_tokens = create_toolbar_tokens_func(self)
layout = create_prompt_layout(
lexer=PygmentsLexer(PostgresLexer),
reserve_space_for_menu=self.min_num_menu_lines,
get_prompt_tokens=prompt_tokens,
get_continuation_tokens=get_continuation_tokens,
get_bottom_toolbar_tokens=get_toolbar_tokens,
display_completions_in_columns=self.wider_completion_menu,
multiline=True,
extra_input_processors=[
# Highlight matching brackets while editing.
ConditionalProcessor(
processor=HighlightMatchingBracketProcessor(
chars='[](){}'),
filter=HasFocus(DEFAULT_BUFFER) & ~IsDone()),
# Render \t as 4 spaces instead of "^I"
TabsProcessor(get_char1=lambda _: ' ',
get_char2=lambda _: ' '),
])
if self.wider_completion_menu:
complete_style = CompleteStyle.MULTI_COLUMN
else:
complete_style = CompleteStyle.COLUMN
with self._completer_lock:
buf = PGBuffer(
prompt_app = PromptSession(
lexer=PygmentsLexer(PostgresLexer),
reserve_space_for_menu=self.min_num_menu_lines,
message=get_message,
prompt_continuation=get_continuation,
bottom_toolbar=get_toolbar_tokens,
complete_style=complete_style,
input_processors=[
# Highlight matching brackets while editing.
ConditionalProcessor(
processor=HighlightMatchingBracketProcessor(
chars='[](){}'),
filter=HasFocus(DEFAULT_BUFFER) & ~IsDone()),
# Render \t as 4 spaces instead of "^I"
TabsProcessor(char1=' ', char2=' ')],
auto_suggest=AutoSuggestFromHistory(),
always_multiline=self.multi_line,
multiline_mode=self.multiline_mode,
completer=self.completer,
tempfile_suffix='.sql',
multiline=pg_is_multiline(self),
history=history,
complete_while_typing=Always(),
accept_action=AcceptAction.RETURN_DOCUMENT)
editing_mode = EditingMode.VI if self.vi_mode else EditingMode.EMACS
application = Application(
completer=DynamicCompleter(lambda: self.completer),
complete_while_typing=True,
style=style_factory(self.syntax_style, self.cli_style),
layout=layout,
buffer=buf,
key_bindings_registry=key_binding_manager.registry,
on_exit=AbortAction.RAISE_EXCEPTION,
on_abort=AbortAction.RETRY,
ignore_case=True,
editing_mode=editing_mode)
include_default_pygments_style=False,
key_bindings=key_bindings,
enable_open_in_editor=True,
enable_system_prompt=True,
editing_mode=EditingMode.VI if self.vi_mode else EditingMode.EMACS,
search_ignore_case=True)
cli = CommandLineInterface(application=application,
eventloop=self.eventloop)
return cli
return prompt_app
def _should_show_limit_prompt(self, status, cur):
"""returns True if limit prompt should be shown, False otherwise."""
if not is_select(status):
return False
return self.row_limit > 0 and cur and cur.rowcount > self.row_limit
return self.row_limit > 0 and cur and (cur.rowcount > self.row_limit)
def _evaluate_command(self, text):
"""Used to run a command entered by the user during CLI operation
@ -792,7 +774,7 @@ class PGCli(object):
break
if self.pgspecial.auto_expand or self.auto_expand:
max_width = self.cli.output.get_size().columns
max_width = self.prompt_app.output.get_size().columns
else:
max_width = None
@ -861,24 +843,25 @@ class PGCli(object):
def _on_completions_refreshed(self, new_completer, persist_priorities):
self._swap_completer_objects(new_completer, persist_priorities)
if self.cli:
if self.prompt_app:
# After refreshing, redraw the CLI to clear the statusbar
# "Refreshing completions..." indicator
self.cli.request_redraw()
self.prompt_app.app.invalidate()
def _swap_completer_objects(self, new_completer, persist_priorities):
"""Swap the completer object in cli with the newly created completer.
"""Swap the completer object with the newly created completer.
persist_priorities is a string specifying how the old completer's
learned prioritizer should be transferred to the new completer.
persist_priorities is a string specifying how the old completer's
learned prioritizer should be transferred to the new completer.
'none' - The new prioritizer is left in a new/clean state
'none' - The new prioritizer is left in a new/clean state
'all' - The new prioritizer is updated to exactly reflect
the old one
'all' - The new prioritizer is updated to exactly reflect
the old one
'keywords' - The new prioritizer is updated with old keyword
priorities, but not any other.
'keywords' - The new prioritizer is updated with old keyword
priorities, but not any other.
"""
with self._completer_lock:
old_completer = self.completer
@ -895,12 +878,7 @@ class PGCli(object):
elif persist_priorities == 'none':
# Leave the new prioritizer as is
pass
# When pgcli is first launched we call refresh_completions before
# instantiating the cli object. So it is necessary to check if cli
# exists before trying the replace the completer object in cli.
if self.cli:
self.cli.current_buffer.completer = new_completer
self.completer = new_completer
def get_completions(self, text, cursor_positition):
with self._completer_lock:
@ -927,9 +905,21 @@ class PGCli(object):
"""Get the last query executed or None."""
return self.query_history[-1][0] if self.query_history else None
def is_wide_line(self, line):
"""Will this line be too wide to fit into terminal?"""
return len(COLOR_CODE_REGEX.sub('', line)) > self.prompt_app.output.get_size().columns
def echo_via_pager(self, text, color=None):
if self.pgspecial.pager_config == PAGER_OFF or self.watch_command:
click.echo(text, color=color)
elif self.pgspecial.pager_config == PAGER_LONG_OUTPUT:
lines = text.split('\n')
# The last 4 lines are reserved for the pgcli menu and padding
if len(lines) >= self.prompt_app.output.get_size().rows - 4 or any(self.is_wide_line(l) for l in lines):
click.echo_via_pager(text, color=color)
else:
click.echo(text, color=color)
else:
click.echo_via_pager(text, color)
@ -1195,7 +1185,7 @@ def format_output(title, cur, headers, status, settings):
first_line = next(formatted)
formatted = itertools.chain([first_line], formatted)
if (not expanded and max_width and len(first_line) > max_width and headers):
if not expanded and max_width and len(first_line) > max_width and headers:
formatted = formatter.format_output(
cur, headers, format_name='vertical', column_types=None, **output_kwargs)
if isinstance(formatted, (text_type)):

View File

@ -1,3 +1,5 @@
from __future__ import unicode_literals
from sqlparse import parse
from sqlparse.tokens import Keyword, CTE, DML
from sqlparse.sql import Identifier, IdentifierList, Parenthesis

View File

@ -1,3 +1,4 @@
from __future__ import unicode_literals
from collections import namedtuple
_ColumnMetadata = namedtuple(

View File

@ -1,4 +1,5 @@
from __future__ import print_function
from __future__ import print_function, unicode_literals
import sqlparse
from collections import namedtuple
from sqlparse.sql import IdentifierList, Identifier, Function

View File

@ -1,3 +1,5 @@
from __future__ import unicode_literals
import re
import sqlparse
from sqlparse.tokens import Name

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import sys
import click

View File

@ -1,4 +1,5 @@
from __future__ import print_function
from __future__ import print_function, unicode_literals
import sys
import re
import sqlparse

View File

@ -1,25 +1,23 @@
from prompt_toolkit.buffer import Buffer
from __future__ import unicode_literals
from prompt_toolkit.enums import DEFAULT_BUFFER
from prompt_toolkit.filters import Condition
from prompt_toolkit.application import get_app
from .packages.parseutils.utils import is_open_quote
class PGBuffer(Buffer):
def __init__(self, always_multiline, multiline_mode, *args, **kwargs):
self.always_multiline = always_multiline
self.multiline_mode = multiline_mode
def pg_is_multiline(pgcli):
@Condition
def cond():
doc = get_app().layout.get_buffer_by_name(DEFAULT_BUFFER).document
@Condition
def is_multiline():
doc = self.document
if not self.always_multiline:
return False
if self.multiline_mode == 'safe':
return True
else:
return not _multiline_exception(doc.text)
super(self.__class__, self).__init__(*args, is_multiline=is_multiline,
tempfile_suffix='.sql', **kwargs)
if not pgcli.multi_line:
return False
if pgcli.multiline_mode == 'safe':
return True
else:
return not _multiline_exception(doc.text)
return cond
def _is_complete(sql):

View File

@ -148,32 +148,31 @@ expand_tab = False
# Custom colors for the completion menu, toolbar, etc.
[colors]
Token.Menu.Completions.Completion.Current = 'bg:#ffffff #000000'
Token.Menu.Completions.Completion = 'bg:#008888 #ffffff'
Token.Menu.Completions.Meta.Current = 'bg:#44aaaa #000000'
Token.Menu.Completions.Meta = 'bg:#448888 #ffffff'
Token.Menu.Completions.MultiColumnMeta = 'bg:#aaffff #000000'
Token.Menu.Completions.ProgressButton = 'bg:#003333'
Token.Menu.Completions.ProgressBar = 'bg:#00aaaa'
Token.SelectedText = '#ffffff bg:#6666aa'
Token.SearchMatch = '#ffffff bg:#4444aa'
Token.SearchMatch.Current = '#ffffff bg:#44aa44'
Token.Toolbar = 'bg:#222222 #aaaaaa'
Token.Toolbar.Off = 'bg:#222222 #888888'
Token.Toolbar.On = 'bg:#222222 #ffffff'
Token.Toolbar.Search = 'noinherit bold'
Token.Toolbar.Search.Text = 'nobold'
Token.Toolbar.System = 'noinherit bold'
Token.Toolbar.Arg = 'noinherit bold'
Token.Toolbar.Arg.Text = 'nobold'
Token.Toolbar.Transaction.Valid = 'bg:#222222 #00ff5f bold'
Token.Toolbar.Transaction.Failed = 'bg:#222222 #ff005f bold'
completion-menu.completion.current = 'bg:#ffffff #000000'
completion-menu.completion = 'bg:#008888 #ffffff'
completion-menu.meta.completion.current = 'bg:#44aaaa #000000'
completion-menu.meta.completion = 'bg:#448888 #ffffff'
completion-menu.multi-column-meta = 'bg:#aaffff #000000'
scrollbar.arrow = 'bg:#003333'
scrollbar = 'bg:#00aaaa'
selected = '#ffffff bg:#6666aa'
search = '#ffffff bg:#4444aa'
search.current = '#ffffff bg:#44aa44'
bottom-toolbar = 'bg:#222222 #aaaaaa'
bottom-toolbar.off = 'bg:#222222 #888888'
bottom-toolbar.on = 'bg:#222222 #ffffff'
search-toolbar = 'noinherit bold'
search-toolbar.text = 'nobold'
system-toolbar = 'noinherit bold'
arg-toolbar = 'noinherit bold'
arg-toolbar.text = 'nobold'
bottom-toolbar.transaction.valid = 'bg:#222222 #00ff5f bold'
bottom-toolbar.transaction.failed = 'bg:#222222 #ff005f bold'
# color of table
# you can use token or custom colors
Token.Output.Header = "#00ff5f bold"
Token.Output.OddRow = ""
Token.Output.EvenRow = ""
# style classes for colored table output
output.header = "#00ff5f bold"
output.odd-row = ""
output.even-row = ""
# Named queries are queries you can execute by name.
[named queries]

View File

@ -6,8 +6,7 @@ import operator
from collections import namedtuple, defaultdict, OrderedDict
from cli_helpers.tabular_output import TabularOutputFormatter
from pgspecial.namedqueries import NamedQueries
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit.contrib.completers import PathCompleter
from prompt_toolkit.completion import Completer, Completion, PathCompleter
from prompt_toolkit.document import Document
from .packages.sqlcompletion import (
FromClauseItem, suggest_type, Special, Database, Schema, Table,

View File

@ -1,9 +1,64 @@
from pygments.token import string_to_tokentype
from pygments.util import ClassNotFound
from prompt_toolkit.styles import PygmentsStyle
import pygments.styles
from __future__ import unicode_literals
import logging
import pygments.styles
from pygments.token import string_to_tokentype, Token
from pygments.style import Style as PygmentsStyle
from pygments.util import ClassNotFound
from prompt_toolkit.styles.pygments import style_from_pygments_cls
from prompt_toolkit.styles import merge_styles, Style
logger = logging.getLogger(__name__)
# map Pygments tokens (ptk 1.0) to class names (ptk 2.0).
TOKEN_TO_PROMPT_STYLE = {
Token.Menu.Completions.Completion.Current: 'completion-menu.completion.current',
Token.Menu.Completions.Completion: 'completion-menu.completion',
Token.Menu.Completions.Meta.Current: 'completion-menu.meta.completion.current',
Token.Menu.Completions.Meta: 'completion-menu.meta.completion',
Token.Menu.Completions.MultiColumnMeta: 'completion-menu.multi-column-meta',
Token.Menu.Completions.ProgressButton: 'scrollbar.arrow', # best guess
Token.Menu.Completions.ProgressBar: 'scrollbar', # best guess
Token.SelectedText: 'selected',
Token.SearchMatch: 'search',
Token.SearchMatch.Current: 'search.current',
Token.Toolbar: 'bottom-toolbar',
Token.Toolbar.Off: 'bottom-toolbar.off',
Token.Toolbar.On: 'bottom-toolbar.on',
Token.Toolbar.Search: 'search-toolbar',
Token.Toolbar.Search.Text: 'search-toolbar.text',
Token.Toolbar.System: 'system-toolbar',
Token.Toolbar.Arg: 'arg-toolbar',
Token.Toolbar.Arg.Text: 'arg-toolbar.text',
Token.Toolbar.Transaction.Valid: 'bottom-toolbar.transaction.valid',
Token.Toolbar.Transaction.Failed: 'bottom-toolbar.transaction.failed',
Token.Output.Header: 'output.header',
Token.Output.OddRow: 'output.odd-row',
Token.Output.EvenRow: 'output.even-row',
}
# reverse dict for cli_helpers, because they still expect Pygments tokens.
PROMPT_STYLE_TO_TOKEN = {
v: k for k, v in TOKEN_TO_PROMPT_STYLE.items()
}
def parse_pygments_style(token_name, style_object, style_dict):
"""Parse token type and style string.
:param token_name: str name of Pygments token. Example: "Token.String"
:param style_object: pygments.style.Style instance to use as base
:param style_dict: dict of token names and their styles, customized to this cli
"""
token_type = string_to_tokentype(token_name)
try:
other_token_type = string_to_tokentype(style_dict[token_name])
return token_type, style_object.styles[other_token_type]
except AttributeError as err:
return token_type, style_dict[token_name]
from pygments.style import Style
def style_factory(name, cli_style):
try:
@ -11,16 +66,31 @@ def style_factory(name, cli_style):
except ClassNotFound:
style = pygments.styles.get_style_by_name('native')
custom_styles = {}
prompt_styles = []
# prompt-toolkit used pygments tokens for styling before, switched to style
# names in 2.0. Convert old token types to new style names, for backwards compatibility.
for token in cli_style:
try:
custom_styles[string_to_tokentype(
token)] = style.styles[string_to_tokentype(cli_style[token])]
except AttributeError as err:
custom_styles[string_to_tokentype(token)] = cli_style[token]
if token.startswith('Token.'):
# treat as pygments token (1.0)
token_type, style_value = parse_pygments_style(
token, style, cli_style)
if token_type in TOKEN_TO_PROMPT_STYLE:
prompt_style = TOKEN_TO_PROMPT_STYLE[token_type]
prompt_styles.append((prompt_style, style_value))
else:
# we don't want to support tokens anymore
logger.error('Unhandled style / class name: %s', token)
else:
# treat as prompt style name (2.0). See default style names here:
# https://github.com/jonathanslenders/python-prompt-toolkit/blob/master/prompt_toolkit/styles/defaults.py
prompt_styles.append((token, cli_style[token]))
return PygmentsStyle.from_defaults(style_dict=custom_styles,
pygments_style_cls=style)
override_style = Style([('bottom-toolbar', 'noreverse')])
return merge_styles([
style_from_pygments_cls(style),
override_style,
Style(prompt_styles)
])
def style_factory_output(name, cli_style):
@ -30,14 +100,18 @@ def style_factory_output(name, cli_style):
style = pygments.styles.get_style_by_name('native').styles
for token in cli_style:
try:
style.update({string_to_tokentype(
token): style[string_to_tokentype(cli_style[token])], })
except AttributeError as err:
style.update(
{string_to_tokentype(token): cli_style[token], })
if token.startswith('Token.'):
token_type, style_value = parse_pygments_style(
token, style, cli_style)
style.update({token_type: style_value})
elif token in PROMPT_STYLE_TO_TOKEN:
token_type = PROMPT_STYLE_TO_TOKEN[token]
style.update({token_type: cli_style[token]})
else:
# TODO: cli helpers will have to switch to ptk.Style
logger.error('Unhandled style / class name: %s', token)
class OutputStyle(pygments.style.Style):
class OutputStyle(PygmentsStyle):
default_style = ""
styles = style

View File

@ -1,59 +1,62 @@
from pygments.token import Token
from prompt_toolkit.enums import DEFAULT_BUFFER
from __future__ import unicode_literals
from prompt_toolkit.key_binding.vi_state import InputMode
from prompt_toolkit.application import get_app
def _get_vi_mode(cli):
def _get_vi_mode():
return {
InputMode.INSERT: 'I',
InputMode.NAVIGATION: 'N',
InputMode.REPLACE: 'R',
InputMode.INSERT_MULTIPLE: 'M',
}[cli.vi_state.input_mode]
}[get_app().vi_state.input_mode]
def create_toolbar_tokens_func(get_vi_mode_enabled, get_is_refreshing,
failed_transaction, valid_transaction):
"""
Return a function that generates the toolbar tokens.
"""
assert callable(get_vi_mode_enabled)
token = Token.Toolbar
def get_toolbar_tokens(cli):
def create_toolbar_tokens_func(pgcli):
"""Return a function that generates the toolbar tokens."""
def get_toolbar_tokens():
result = []
result.append((token, ' '))
result.append(('class:bottom-toolbar', ' '))
if cli.buffers[DEFAULT_BUFFER].completer.smart_completion:
result.append((token.On, '[F2] Smart Completion: ON '))
if pgcli.completer.smart_completion:
result.append(('class:bottom-toolbar.on',
'[F2] Smart Completion: ON '))
else:
result.append((token.Off, '[F2] Smart Completion: OFF '))
result.append(('class:bottom-toolbar.off',
'[F2] Smart Completion: OFF '))
if cli.buffers[DEFAULT_BUFFER].always_multiline:
result.append((token.On, '[F3] Multiline: ON '))
if pgcli.multi_line:
result.append(('class:bottom-toolbar.on', '[F3] Multiline: ON '))
else:
result.append((token.Off, '[F3] Multiline: OFF '))
result.append(('class:bottom-toolbar.off',
'[F3] Multiline: OFF '))
if cli.buffers[DEFAULT_BUFFER].always_multiline:
if cli.buffers[DEFAULT_BUFFER].multiline_mode == 'safe':
result.append((token,' ([Esc] [Enter] to execute]) '))
if pgcli.multi_line:
if pgcli.multiline_mode == 'safe':
result.append(
('class:bottom-toolbar', ' ([Esc] [Enter] to execute]) '))
else:
result.append((token,' (Semi-colon [;] will end the line) '))
result.append(
('class:bottom-toolbar', ' (Semi-colon [;] will end the line) '))
if get_vi_mode_enabled():
result.append((token.On, '[F4] Vi-mode (' + _get_vi_mode(cli) + ')'))
if pgcli.vi_mode:
result.append(
('class:bottom-toolbar', '[F4] Vi-mode (' + _get_vi_mode() + ')'))
else:
result.append((token.On, '[F4] Emacs-mode'))
result.append(('class:bottom-toolbar', '[F4] Emacs-mode'))
if failed_transaction():
result.append((token.Transaction.Failed, ' Failed transaction'))
if pgcli.pgexecute.failed_transaction():
result.append(('class:bottom-toolbar.transaction.failed',
' Failed transaction'))
if valid_transaction():
result.append((token.Transaction.Valid, ' Transaction'))
if pgcli.pgexecute.valid_transaction():
result.append(
('class:bottom-toolbar.transaction.valid', ' Transaction'))
if get_is_refreshing():
result.append((token, ' Refreshing completions...'))
if pgcli.completion_refresher.is_refreshing():
result.append(
('class:bottom-toolbar', ' Refreshing completions...'))
return result
return get_toolbar_tokens

View File

@ -1,4 +1,5 @@
pytest>=2.7.0
# The maximum version requirement can be removed once Python 3.4 goes EOL
pytest>=2.7.0,<=3.0.7
mock>=1.0.1
tox>=1.9.2
behave>=1.2.4

View File

@ -12,10 +12,10 @@ with open('pgcli/__init__.py', 'rb') as f:
description = 'CLI for Postgres Database. With auto-completion and syntax highlighting.'
install_requirements = [
'pgspecial>=1.11.0',
'pgspecial>=1.11.2',
'click >= 4.1',
'Pygments >= 2.0', # Pygments has to be Capitalcased. WTF?
'prompt_toolkit>=1.0.10,<1.1.0',
'prompt_toolkit>=2.0.0,<2.1.0',
'psycopg2 >= 2.7.4,<2.8',
'sqlparse >=0.2.2,<0.3.0',
'configobj >= 5.0.6',

View File

@ -1,3 +1,5 @@
from __future__ import print_function
import os
import pytest
from utils import (POSTGRES_HOST, POSTGRES_USER, POSTGRES_PASSWORD, create_db, db_connection,

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import unicode_literals, print_function
import copy
import os
import sys
import db_utils as dbutils
@ -10,13 +10,13 @@ import pexpect
import tempfile
import shutil
from steps.wrappers import run_cli, wait_prompt
from steps import wrappers
def before_all(context):
"""
Set env parameters.
"""
"""Set env parameters."""
env_old = copy.deepcopy(dict(os.environ))
os.environ['LINES'] = "100"
os.environ['COLUMNS'] = "100"
os.environ['PAGER'] = 'cat'
@ -24,6 +24,11 @@ def before_all(context):
context.package_root = os.path.abspath(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
fixture_dir = os.path.join(
context.package_root, 'tests/features/fixture_data')
print('package root:', context.package_root)
print('fixture dir:', fixture_dir)
os.environ["COVERAGE_PROCESS_START"] = os.path.join(context.package_root,
'.coveragerc')
@ -102,6 +107,19 @@ def before_all(context):
# use temporary directory as config home
context.env_config_home = tempfile.mkdtemp(prefix='pgcli_home_')
os.environ['XDG_CONFIG_HOME'] = context.env_config_home
show_env_changes(env_old, dict(os.environ))
def show_env_changes(env_old, env_new):
"""Print out all test-specific env values."""
print('--- os.environ changed values: ---')
all_keys = set(list(env_old.keys()) + list(env_new.keys()))
for k in sorted(all_keys):
old_value = env_old.get(k, '')
new_value = env_new.get(k, '')
if new_value and old_value != new_value:
print('{}="{}"'.format(k, new_value))
print('-' * 20)
def after_all(context):
@ -129,13 +147,12 @@ def before_step(context, _):
def before_scenario(context, _):
run_cli(context)
wait_prompt(context)
wrappers.run_cli(context)
wrappers.wait_prompt(context)
def after_scenario(context, _):
"""Cleans up after each test complete."""
"""Cleans up after each scenario completes."""
if hasattr(context, 'cli') and not context.exit_sent:
# Quit nicely.
if not context.atprompt:
@ -147,8 +164,12 @@ def after_scenario(context, _):
context.cli.sendcontrol('c')
context.cli.sendcontrol('d')
context.cli.expect_exact(pexpect.EOF, timeout=10)
if hasattr(context, 'tmpfile_sql_help') and context.tmpfile_sql_help:
context.tmpfile_sql_help.close()
context.tmpfile_sql_help = None
# TODO: uncomment to debug a failure
# # TODO: uncomment to debug a failure
# def after_step(context, step):
# if step.status == "failed":
# import ipdb; ipdb.set_trace()
# import pdb; pdb.set_trace()

View File

@ -19,13 +19,11 @@ def read_fixture_lines(filename):
def read_fixture_files():
"""
Read all files inside fixture_data directory.
"""
fixture_dict = {}
"""Read all files inside fixture_data directory."""
current_dir = os.path.dirname(__file__)
fixture_dir = os.path.join(current_dir, 'fixture_data/')
print('reading fixture data: {}'.format(fixture_dir))
fixture_dict = {}
for filename in os.listdir(fixture_dir):
if filename not in ['.', '..']:
fullname = os.path.join(fixture_dir, filename)

View File

@ -1,9 +1,8 @@
# -*- coding: utf-8
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
from textwrap import dedent
from behave import then, when
import wrappers

View File

@ -4,13 +4,13 @@ Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
import tempfile
from behave import when
import wrappers
from behave import when, then
from textwrap import dedent
import wrappers
@when('we run dbcli')
@ -28,6 +28,9 @@ def step_ctrl_d(context):
"""
Send Ctrl + D to hopefully exit.
"""
# turn off pager before exiting
context.cli.sendline('\pset pager off')
wrappers.wait_prompt(context)
context.cli.sendcontrol('d')
context.exit_sent = True
@ -42,12 +45,12 @@ def step_send_help(context):
@when(u'we send source command')
def step_send_source_command(context):
with tempfile.NamedTemporaryFile() as f:
f.write(b'\?')
f.flush()
context.cli.sendline('\i {0}'.format(f.name))
wrappers.expect_exact(
context, context.conf['pager_boundary'] + '\r\n', timeout=5)
context.tmpfile_sql_help = tempfile.NamedTemporaryFile(prefix='pgcli_')
context.tmpfile_sql_help.write(b'\?')
context.tmpfile_sql_help.flush()
context.cli.sendline('\i {0}'.format(context.tmpfile_sql_help.name))
wrappers.expect_exact(
context, context.conf['pager_boundary'] + '\r\n', timeout=5)
@when(u'we run query to check application_name')

View File

@ -4,12 +4,12 @@ Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
import pexpect
import wrappers
from behave import when, then
import wrappers
@when('we create database')
@ -72,7 +72,7 @@ def step_see_prompt(context):
@then('we see help output')
def step_see_help(context):
for expected_line in context.fixture_data['help_commands.txt']:
wrappers.expect_exact(context, expected_line, timeout=1)
wrappers.expect_exact(context, expected_line, timeout=2)
@then('we see database created')

View File

@ -4,11 +4,11 @@ Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
import wrappers
from behave import when, then
from textwrap import dedent
import wrappers
@when('we create table')

View File

@ -5,11 +5,11 @@ Each step is defined by the string decorating it. This string is used
to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
import wrappers
from behave import when, then
from textwrap import dedent
import wrappers
@when('we prepare the test data')

View File

@ -1,10 +1,10 @@
# -*- coding: utf-8
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
import os
import os.path
import wrappers
from behave import when, then
import wrappers
@when('we start external editor providing a file name')

View File

@ -4,10 +4,10 @@ Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
import wrappers
from behave import when, then
import wrappers
@when('we save a named query')
@ -39,7 +39,7 @@ def step_see_named_query_saved(context):
"""
Wait to see query saved.
"""
wrappers.expect_pager(context, 'Saved.\r\n', timeout=1)
wrappers.expect_exact(context, 'Saved.', timeout=2)
@then('we see the named query executed')

View File

@ -4,10 +4,10 @@ Steps for behavioral style tests are defined in this module.
Each step is defined by the string decorating it.
This string is used to call the step in "*.feature" file.
"""
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
import wrappers
from behave import when, then
import wrappers
@when('we refresh completions')

View File

@ -3,6 +3,7 @@ from __future__ import unicode_literals
import re
import pexpect
from pgcli.main import COLOR_CODE_REGEX
def expect_exact(context, expected, timeout):
@ -10,7 +11,7 @@ def expect_exact(context, expected, timeout):
context.cli.expect_exact(expected, timeout=timeout)
except:
# Strip color codes out of the output.
actual = re.sub(r'\x1b\[([0-9A-Za-z;?])+[m|K]?', '', context.cli.before)
actual = COLOR_CODE_REGEX.sub('', context.cli.before)
raise Exception('Expected:\n---\n{0!r}\n---\n\nActual:\n---\n{1!r}\n---'.format(
expected,
actual))
@ -30,6 +31,8 @@ def run_cli(context, run_args=None):
context.cli = pexpect.spawnu(cmd, cwd=context.package_root)
context.exit_sent = False
context.currentdb = context.conf['dbname']
context.cli.sendline('\pset pager always')
wait_prompt(context)
def wait_prompt(context):

View File

@ -1,3 +1,5 @@
from __future__ import unicode_literals
from functools import partial
from itertools import product
from pgcli.packages.parseutils.meta import FunctionMetadata, ForeignKey

View File

@ -1,9 +1,8 @@
# coding=utf-8
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
import os
import platform
import mock
from decimal import Decimal
import pytest
try:
@ -12,9 +11,11 @@ except ImportError:
setproctitle = None
from pgcli.main import (
obfuscate_process_password, format_output, PGCli, OutputSettings
obfuscate_process_password, format_output, PGCli, OutputSettings, COLOR_CODE_REGEX
)
from pgspecial.main import (PAGER_OFF, PAGER_LONG_OUTPUT, PAGER_ALWAYS)
from utils import dbtest, run
from collections import namedtuple
@pytest.mark.skipif(platform.system() == 'Windows',
@ -147,6 +148,100 @@ def test_format_output_auto_expand():
assert '\n'.join(expanded_results) == '\n'.join(expanded)
termsize = namedtuple('termsize', ['rows', 'columns'])
test_line = '-' * 10
test_data = [
(10, 10, '\n'.join([test_line] * 7)),
(10, 10, '\n'.join([test_line] * 6)),
(10, 10, '\n'.join([test_line] * 5)),
(10, 10, '-' * 11),
(10, 10, '-' * 10),
(10, 10, '-' * 9),
]
# 4 lines are reserved at the bottom of the terminal for pgcli's prompt
use_pager_when_on = [True,
True,
False,
True,
False,
False]
# Can be replaced with pytest.param once we can upgrade pytest after Python 3.4 goes EOL
test_ids = ["Output longer than terminal height",
"Output equal to terminal height",
"Output shorter than terminal height",
"Output longer than terminal width",
"Output equal to terminal width",
"Output shorter than terminal width"]
@pytest.fixture
def pset_pager_mocks():
cli = PGCli()
cli.watch_command = None
with mock.patch('pgcli.main.click.echo') as mock_echo, \
mock.patch('pgcli.main.click.echo_via_pager') as mock_echo_via_pager, \
mock.patch.object(cli, 'prompt_app') as mock_app:
yield cli, mock_echo, mock_echo_via_pager, mock_app
@pytest.mark.parametrize('term_height,term_width,text', test_data, ids=test_ids)
def test_pset_pager_off(term_height, term_width, text, pset_pager_mocks):
cli, mock_echo, mock_echo_via_pager, mock_cli = pset_pager_mocks
mock_cli.output.get_size.return_value = termsize(
rows=term_height, columns=term_width)
with mock.patch.object(cli.pgspecial, 'pager_config', PAGER_OFF):
cli.echo_via_pager(text)
mock_echo.assert_called()
mock_echo_via_pager.assert_not_called()
@pytest.mark.parametrize('term_height,term_width,text', test_data, ids=test_ids)
def test_pset_pager_always(term_height, term_width, text, pset_pager_mocks):
cli, mock_echo, mock_echo_via_pager, mock_cli = pset_pager_mocks
mock_cli.output.get_size.return_value = termsize(
rows=term_height, columns=term_width)
with mock.patch.object(cli.pgspecial, 'pager_config', PAGER_ALWAYS):
cli.echo_via_pager(text)
mock_echo.assert_not_called()
mock_echo_via_pager.assert_called()
pager_on_test_data = [l + (r,) for l, r in zip(test_data, use_pager_when_on)]
@pytest.mark.parametrize('term_height,term_width,text,use_pager', pager_on_test_data, ids=test_ids)
def test_pset_pager_on(term_height, term_width, text, use_pager, pset_pager_mocks):
cli, mock_echo, mock_echo_via_pager, mock_cli = pset_pager_mocks
mock_cli.output.get_size.return_value = termsize(
rows=term_height, columns=term_width)
with mock.patch.object(cli.pgspecial, 'pager_config', PAGER_LONG_OUTPUT):
cli.echo_via_pager(text)
if use_pager:
mock_echo.assert_not_called()
mock_echo_via_pager.assert_called()
else:
mock_echo_via_pager.assert_not_called()
mock_echo.assert_called()
@pytest.mark.parametrize('text,expected_length', [
(u"22200K .......\u001b[0m\u001b[91m... .......... ...\u001b[0m\u001b[91m.\u001b[0m\u001b[91m...... .........\u001b[0m\u001b[91m.\u001b[0m\u001b[91m \u001b[0m\u001b[91m.\u001b[0m\u001b[91m.\u001b[0m\u001b[91m.\u001b[0m\u001b[91m.\u001b[0m\u001b[91m...... 50% 28.6K 12m55s", 78),
(u"=\u001b[m=", 2),
(u"-\u001b]23\u0007-", 2),
])
def test_color_pattern(text, expected_length, pset_pager_mocks):
cli = pset_pager_mocks[0]
assert len(COLOR_CODE_REGEX.sub('', text)) == expected_length
@dbtest
def test_i_works(tmpdir, executor):
sqlfile = tmpdir.join("test.sql")

View File

@ -1,4 +1,5 @@
# coding=UTF-8
from __future__ import print_function
import pytest
import psycopg2

View File

@ -8,7 +8,6 @@ from pgcli.packages.prompt_utils import confirm_destructive_query
def test_confirm_destructive_query_notty():
stdin = click.get_text_stream('stdin')
assert stdin.isatty() is False
sql = 'drop database foo;'
assert confirm_destructive_query(sql) is None
if not stdin.isatty():
sql = 'drop database foo;'
assert confirm_destructive_query(sql) is None

View File

@ -1,4 +1,5 @@
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
import itertools
from metadata import (MetaData, alias, name_join, fk_join, join,
schema, table, function, wildcard_expansion, column,

View File

@ -1,4 +1,5 @@
from __future__ import unicode_literals
from __future__ import unicode_literals, print_function
from metadata import (MetaData, alias, name_join, fk_join, join, keyword,
schema, table, view, function, column, wildcard_expansion,
get_result, result_set, qual, no_qual, parametrize)