1
0
Fork 0

Cherry-picked prompt-toolkit 2.0 changes. (#930)

* Cherry-picked prompt-toolkit 2.0 changes.

* Increase help timeout.

* Missed one.

* Fixes editor command.

* Expect exact to fix named query error.

* Unicode is non-optional with ptk 2.0.

* Unicode literals all the things (almost).

* PEP8.

* Change how we swap completers.

* By default, bottom toolbar styles are reversed. We don't want that.

* Adapt styles to 2.0.

* The future is now. Switch to ptk 2.0 style names.

* PEP8.

* Flag for enable_open_in_editor.

* add class:prompt to prompt

* Removed workaround for #668. Some renaming.

* use pgcli.completer instead of app.current_buffer.completer

* enable_system_prompt=True like old prompt toolkit

* keep search_ignore_case enabled (was ignore_case)

* fix closing parenthese

* keep marking class:continuation token for continuation

* capture KeyboardInterrupt manually

AbortAction has been removed in Prompt_toolkit 2.0

* replace C-J with enter, add more comments

* reversed ([...]) to [(...)] (oops)

* pep8 fixes

* Does Vi mode have to be applied to session every time?

* (workaround) also enable vi_mode after edit command

* Fixed test errors after rebasing on master.
This commit is contained in:
Irina Truong 2018-09-28 14:18:40 -07:00 committed by GitHub
parent d3bdb891d3
commit 392491a74d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 331 additions and 301 deletions

View File

@ -81,7 +81,7 @@ class CompletionRefresher(object):
# Load history into pgcompleter so it can learn user preferences
n_recent = 100
if history:
for recent in history[-n_recent:]:
for recent in history.get_strings()[-n_recent:]:
completer.extend_query_history(recent, is_init=True)
for callback in callbacks:

View File

@ -1,13 +0,0 @@
from prompt_toolkit.filters import Filter
class HasSelectedCompletion(Filter):
"""Enable when the current buffer has a selected completion."""
def __call__(self, cli):
complete_state = cli.current_buffer.complete_state
return (complete_state is not None and
complete_state.current_completion is not None)
def __repr__(self):
return "HasSelectedCompletion()"

View File

@ -1,69 +1,47 @@
from __future__ import unicode_literals
import logging
from prompt_toolkit.enums import EditingMode
from prompt_toolkit.keys import Keys
from prompt_toolkit.key_binding.manager import KeyBindingManager
from prompt_toolkit.filters import Condition
from .filters import HasSelectedCompletion
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.filters import completion_is_selected
_logger = logging.getLogger(__name__)
def pgcli_bindings(get_vi_mode_enabled, set_vi_mode_enabled):
"""
Custom key bindings for pgcli.
"""
assert callable(get_vi_mode_enabled)
assert callable(set_vi_mode_enabled)
def pgcli_bindings(pgcli):
"""Custom key bindings for pgcli."""
kb = KeyBindings()
key_binding_manager = KeyBindingManager(
enable_open_in_editor=True,
enable_system_bindings=True,
enable_auto_suggest_bindings=True,
enable_search=True,
enable_abort_and_exit_bindings=True)
@key_binding_manager.registry.add_binding(Keys.F2)
@kb.add('f2')
def _(event):
"""
Enable/Disable SmartCompletion Mode.
"""
"""Enable/Disable SmartCompletion Mode."""
_logger.debug('Detected F2 key.')
buf = event.cli.current_buffer
buf.completer.smart_completion = not buf.completer.smart_completion
pgcli.completer.smart_completion = not pgcli.completer.smart_completion
@key_binding_manager.registry.add_binding(Keys.F3)
@kb.add('f3')
def _(event):
"""
Enable/Disable Multiline Mode.
"""
"""Enable/Disable Multiline Mode."""
_logger.debug('Detected F3 key.')
buf = event.cli.current_buffer
buf.always_multiline = not buf.always_multiline
pgcli.multi_line = not pgcli.multi_line
@key_binding_manager.registry.add_binding(Keys.F4)
@kb.add('f4')
def _(event):
"""
Toggle between Vi and Emacs mode.
"""
"""Toggle between Vi and Emacs mode."""
_logger.debug('Detected F4 key.')
vi_mode = not get_vi_mode_enabled()
set_vi_mode_enabled(vi_mode)
pgcli.vi_mode = not pgcli.vi_mode
event.app.editing_mode = EditingMode.VI if pgcli.vi_mode else EditingMode.EMACS
event.cli.editing_mode = EditingMode.VI if vi_mode else EditingMode.EMACS
@key_binding_manager.registry.add_binding(Keys.Tab)
@kb.add('tab')
def _(event):
"""
Force autocompletion at cursor.
"""
"""Force autocompletion at cursor."""
_logger.debug('Detected <Tab> key.')
b = event.cli.current_buffer
b = event.app.current_buffer
if b.complete_state:
b.complete_next()
else:
event.cli.start_completion(select_first=True)
b.start_completion(select_first=True)
@key_binding_manager.registry.add_binding(Keys.ControlSpace)
@kb.add('c-space')
def _(event):
"""
Initialize autocompletion at cursor.
@ -75,21 +53,25 @@ def pgcli_bindings(get_vi_mode_enabled, set_vi_mode_enabled):
"""
_logger.debug('Detected <C-Space> key.')
b = event.cli.current_buffer
b = event.app.current_buffer
if b.complete_state:
b.complete_next()
else:
event.cli.start_completion(select_first=False)
b.start_completion(select_first=False)
@key_binding_manager.registry.add_binding(Keys.ControlJ, filter=HasSelectedCompletion())
@kb.add('enter', filter=completion_is_selected)
def _(event):
"""Makes the enter key work as the tab key only when showing the menu.
In other words, don't execute query when enter is pressed in
the completion dropdown menu, instead close the dropdown menu
(accept current selection).
"""
Makes the enter key work as the tab key only when showing the menu.
"""
_logger.debug('Detected <C-J> key.')
_logger.debug('Detected enter key.')
event.current_buffer.complete_state = None
b = event.cli.current_buffer
b = event.app.current_buffer
b.complete_state = None
return key_binding_manager
return kb

View File

@ -27,20 +27,18 @@ try:
import setproctitle
except ImportError:
setproctitle = None
from prompt_toolkit import CommandLineInterface, Application, AbortAction
from prompt_toolkit.completion import DynamicCompleter
from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode
from prompt_toolkit.shortcuts import create_prompt_layout, create_eventloop
from prompt_toolkit.buffer import AcceptAction
from prompt_toolkit.shortcuts import PromptSession, CompleteStyle
from prompt_toolkit.document import Document
from prompt_toolkit.filters import Always, HasFocus, IsDone
from prompt_toolkit.layout.lexers import PygmentsLexer
from prompt_toolkit.filters import HasFocus, IsDone
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.layout.processors import (ConditionalProcessor,
HighlightMatchingBracketProcessor,
TabsProcessor)
from prompt_toolkit.history import FileHistory
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from pygments.lexers.sql import PostgresLexer
from pygments.token import Token
from pgspecial.main import (PGSpecial, NO_QUERY, PAGER_OFF, PAGER_LONG_OUTPUT)
import pgspecial as special
@ -52,7 +50,7 @@ from .pgcompleter import PGCompleter
from .pgtoolbar import create_toolbar_tokens_func
from .pgstyle import style_factory, style_factory_output
from .pgexecute import PGExecute
from .pgbuffer import PGBuffer
from .pgbuffer import pg_is_multiline
from .completion_refresher import CompletionRefresher
from .config import (get_casing_file,
load_config, config_location, ensure_dir_exists, get_config)
@ -218,8 +216,7 @@ class PGCli(object):
self._completer_lock = threading.Lock()
self.register_special_commands()
self.eventloop = create_eventloop()
self.cli = None
self.prompt_app = None
def quit(self):
raise PgCliQuitError
@ -500,7 +497,7 @@ class PGCli(object):
self.pgexecute = pgexecute
def handle_editor_command(self, cli, document):
def handle_editor_command(self, text):
r"""
Editor command is any query that is prefixed or suffixed
by a '\e'. The reason for a while loop is because a user
@ -509,41 +506,39 @@ class PGCli(object):
"select * from \e"<enter> to edit it in vim, then come
back to the prompt with the edited query "select * from
blah where q = 'abc'\e" to edit it again.
:param cli: CommandLineInterface
:param document: Document
:param text: Document
:return: Document
"""
# FIXME: using application.pre_run_callables like this here is not the best solution.
# It's internal api of prompt_toolkit that may change. This was added to fix #668.
# We may find a better way to do it in the future.
saved_callables = cli.application.pre_run_callables
try:
editor_command = special.editor_command(document.text)
while editor_command:
if editor_command == '\\e':
filename = special.get_filename(document.text)
query = (special.get_editor_query(document.text) or
self.get_last_query())
else: # \ev or \ef
filename = None
spec = document.text.split()[1]
if editor_command == '\\ev':
query = self.pgexecute.view_definition(spec)
elif editor_command == '\\ef':
query = self.pgexecute.function_definition(spec)
sql, message = special.open_external_editor(
filename, sql=query)
if message:
# Something went wrong. Raise an exception and bail.
raise RuntimeError(message)
cli.current_buffer.document = Document(
sql, cursor_position=len(sql))
cli.application.pre_run_callables = []
document = cli.run()
editor_command = special.editor_command(document.text)
finally:
cli.application.pre_run_callables = saved_callables
return document
editor_command = special.editor_command(text)
while editor_command:
if editor_command == '\\e':
filename = special.get_filename(text)
query = special.get_editor_query(
text) or self.get_last_query()
else: # \ev or \ef
filename = None
spec = text.split()[1]
if editor_command == '\\ev':
query = self.pgexecute.view_definition(spec)
elif editor_command == '\\ef':
query = self.pgexecute.function_definition(spec)
sql, message = special.open_external_editor(
filename, sql=query)
if message:
# Something went wrong. Raise an exception and bail.
raise RuntimeError(message)
while True:
try:
text = self.prompt_app.prompt(
default=sql,
vi_mode=self.vi_mode
)
break
except KeyboardInterrupt:
sql = None
editor_command = special.editor_command(text)
return text
def execute_command(self, text):
logger = self.logger
@ -626,7 +621,7 @@ class PGCli(object):
self.refresh_completions(history=history,
persist_priorities='none')
self.cli = self._build_cli(history)
self.prompt_app = self._build_cli(history)
if not self.less_chatty:
print('Server: PostgreSQL', self.pgexecute.get_server_version())
@ -637,18 +632,21 @@ class PGCli(object):
try:
while True:
document = self.cli.run()
try:
text = self.prompt_app.prompt(vi_mode=self.vi_mode)
except KeyboardInterrupt:
continue
try:
document = self.handle_editor_command(self.cli, document)
text = self.handle_editor_command(text)
except RuntimeError as e:
logger.error("sql: %r, error: %r", document.text, e)
logger.error("sql: %r, error: %r", text, e)
logger.error("traceback: %r", traceback.format_exc())
click.secho(str(e), err=True, fg='red')
continue
self.watch_command, timing = special.get_watch_command(
document.text)
# Initialize default metaquery in case execution fails
self.watch_command, timing = special.get_watch_command(text)
if self.watch_command:
while self.watch_command:
try:
@ -660,13 +658,13 @@ class PGCli(object):
except KeyboardInterrupt:
self.watch_command = None
else:
query = self.execute_command(document.text)
query = self.execute_command(text)
self.now = dt.datetime.today()
# Allow PGCompleter to learn user's preferred keywords, etc.
with self._completer_lock:
self.completer.extend_query_history(document.text)
self.completer.extend_query_history(text)
self.query_history.append(query)
@ -675,15 +673,9 @@ class PGCli(object):
print ('Goodbye!')
def _build_cli(self, history):
key_bindings = pgcli_bindings(self)
def set_vi_mode(value):
self.vi_mode = value
key_binding_manager = pgcli_bindings(
get_vi_mode_enabled=lambda: self.vi_mode,
set_vi_mode_enabled=set_vi_mode)
def prompt_tokens(_):
def get_message():
if self.dsn_alias and self.prompt_dsn_format is not None:
prompt_format = self.prompt_dsn_format
else:
@ -695,68 +687,56 @@ class PGCli(object):
len(prompt) > self.max_len_prompt):
prompt = self.get_prompt('\\d> ')
return [(Token.Prompt, prompt)]
return [('class:prompt', prompt)]
def get_continuation_tokens(cli, width):
continuation=self.multiline_continuation_char * (width - 1) + ' '
return [(Token.Continuation, continuation)]
def get_continuation(width, line_number, is_soft_wrap):
continuation = self.multiline_continuation_char * (width - 1) + ' '
return [('class:continuation', continuation)]
get_toolbar_tokens = create_toolbar_tokens_func(
lambda: self.vi_mode, self.completion_refresher.is_refreshing,
self.pgexecute.failed_transaction,
self.pgexecute.valid_transaction)
get_toolbar_tokens = create_toolbar_tokens_func(self)
layout = create_prompt_layout(
lexer=PygmentsLexer(PostgresLexer),
reserve_space_for_menu=self.min_num_menu_lines,
get_prompt_tokens=prompt_tokens,
get_continuation_tokens=get_continuation_tokens,
get_bottom_toolbar_tokens=get_toolbar_tokens,
display_completions_in_columns=self.wider_completion_menu,
multiline=True,
extra_input_processors=[
# Highlight matching brackets while editing.
ConditionalProcessor(
processor=HighlightMatchingBracketProcessor(
chars='[](){}'),
filter=HasFocus(DEFAULT_BUFFER) & ~IsDone()),
# Render \t as 4 spaces instead of "^I"
TabsProcessor(get_char1=lambda _: ' ',
get_char2=lambda _: ' '),
])
if self.wider_completion_menu:
complete_style = CompleteStyle.MULTI_COLUMN
else:
complete_style = CompleteStyle.COLUMN
with self._completer_lock:
buf = PGBuffer(
prompt_app = PromptSession(
lexer=PygmentsLexer(PostgresLexer),
reserve_space_for_menu=self.min_num_menu_lines,
message=get_message,
prompt_continuation=get_continuation,
bottom_toolbar=get_toolbar_tokens,
complete_style=complete_style,
input_processors=[
# Highlight matching brackets while editing.
ConditionalProcessor(
processor=HighlightMatchingBracketProcessor(
chars='[](){}'),
filter=HasFocus(DEFAULT_BUFFER) & ~IsDone()),
# Render \t as 4 spaces instead of "^I"
TabsProcessor(char1=' ', char2=' ')],
auto_suggest=AutoSuggestFromHistory(),
always_multiline=self.multi_line,
multiline_mode=self.multiline_mode,
completer=self.completer,
tempfile_suffix='.sql',
multiline=pg_is_multiline(self),
history=history,
complete_while_typing=Always(),
accept_action=AcceptAction.RETURN_DOCUMENT)
editing_mode = EditingMode.VI if self.vi_mode else EditingMode.EMACS
application = Application(
completer=DynamicCompleter(lambda: self.completer),
complete_while_typing=True,
style=style_factory(self.syntax_style, self.cli_style),
layout=layout,
buffer=buf,
key_bindings_registry=key_binding_manager.registry,
on_exit=AbortAction.RAISE_EXCEPTION,
on_abort=AbortAction.RETRY,
ignore_case=True,
editing_mode=editing_mode)
include_default_pygments_style=False,
key_bindings=key_bindings,
enable_open_in_editor=True,
enable_system_prompt=True,
editing_mode=EditingMode.VI if self.vi_mode else EditingMode.EMACS,
search_ignore_case=True)
cli = CommandLineInterface(application=application,
eventloop=self.eventloop)
return cli
return prompt_app
def _should_show_limit_prompt(self, status, cur):
"""returns True if limit prompt should be shown, False otherwise."""
if not is_select(status):
return False
return self.row_limit > 0 and cur and cur.rowcount > self.row_limit
return self.row_limit > 0 and cur and (cur.rowcount > self.row_limit)
def _evaluate_command(self, text):
"""Used to run a command entered by the user during CLI operation
@ -794,7 +774,7 @@ class PGCli(object):
break
if self.pgspecial.auto_expand or self.auto_expand:
max_width = self.cli.output.get_size().columns
max_width = self.prompt_app.output.get_size().columns
else:
max_width = None
@ -863,24 +843,25 @@ class PGCli(object):
def _on_completions_refreshed(self, new_completer, persist_priorities):
self._swap_completer_objects(new_completer, persist_priorities)
if self.cli:
if self.prompt_app:
# After refreshing, redraw the CLI to clear the statusbar
# "Refreshing completions..." indicator
self.cli.request_redraw()
self.prompt_app.app.invalidate()
def _swap_completer_objects(self, new_completer, persist_priorities):
"""Swap the completer object in cli with the newly created completer.
"""Swap the completer object with the newly created completer.
persist_priorities is a string specifying how the old completer's
learned prioritizer should be transferred to the new completer.
persist_priorities is a string specifying how the old completer's
learned prioritizer should be transferred to the new completer.
'none' - The new prioritizer is left in a new/clean state
'none' - The new prioritizer is left in a new/clean state
'all' - The new prioritizer is updated to exactly reflect
the old one
'all' - The new prioritizer is updated to exactly reflect
the old one
'keywords' - The new prioritizer is updated with old keyword
priorities, but not any other.
'keywords' - The new prioritizer is updated with old keyword
priorities, but not any other.
"""
with self._completer_lock:
old_completer = self.completer
@ -897,12 +878,7 @@ class PGCli(object):
elif persist_priorities == 'none':
# Leave the new prioritizer as is
pass
# When pgcli is first launched we call refresh_completions before
# instantiating the cli object. So it is necessary to check if cli
# exists before trying the replace the completer object in cli.
if self.cli:
self.cli.current_buffer.completer = new_completer
self.completer = new_completer
def get_completions(self, text, cursor_positition):
with self._completer_lock:
@ -929,6 +905,10 @@ class PGCli(object):
"""Get the last query executed or None."""
return self.query_history[-1][0] if self.query_history else None
def is_wide_line(self, line):
"""Will this line be too wide to fit into terminal?"""
return len(COLOR_CODE_REGEX.sub('', line)) > self.prompt_app.output.get_size().columns
def echo_via_pager(self, text, color=None):
if self.pgspecial.pager_config == PAGER_OFF or self.watch_command:
click.echo(text, color=color)
@ -936,8 +916,7 @@ class PGCli(object):
lines = text.split('\n')
# The last 4 lines are reserved for the pgcli menu and padding
if len(lines) >= self.cli.output.get_size().rows - 4 \
or any(len(COLOR_CODE_REGEX.sub('', l)) > self.cli.output.get_size().columns for l in lines):
if len(lines) >= self.prompt_app.output.get_size().rows - 4 or any(self.is_wide_line(l) for l in lines):
click.echo_via_pager(text, color=color)
else:
click.echo(text, color=color)
@ -1206,7 +1185,7 @@ def format_output(title, cur, headers, status, settings):
first_line = next(formatted)
formatted = itertools.chain([first_line], formatted)
if (not expanded and max_width and len(first_line) > max_width and headers):
if not expanded and max_width and len(first_line) > max_width and headers:
formatted = formatter.format_output(
cur, headers, format_name='vertical', column_types=None, **output_kwargs)
if isinstance(formatted, (text_type)):

View File

@ -1,3 +1,5 @@
from __future__ import unicode_literals
from sqlparse import parse
from sqlparse.tokens import Keyword, CTE, DML
from sqlparse.sql import Identifier, IdentifierList, Parenthesis

View File

@ -1,3 +1,4 @@
from __future__ import unicode_literals
from collections import namedtuple
_ColumnMetadata = namedtuple(

View File

@ -1,4 +1,5 @@
from __future__ import print_function
from __future__ import print_function, unicode_literals
import sqlparse
from collections import namedtuple
from sqlparse.sql import IdentifierList, Identifier, Function

View File

@ -1,3 +1,5 @@
from __future__ import unicode_literals
import re
import sqlparse
from sqlparse.tokens import Name

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import sys
import click

View File

@ -1,4 +1,5 @@
from __future__ import print_function
from __future__ import print_function, unicode_literals
import sys
import re
import sqlparse

View File

@ -1,25 +1,23 @@
from prompt_toolkit.buffer import Buffer
from __future__ import unicode_literals
from prompt_toolkit.enums import DEFAULT_BUFFER
from prompt_toolkit.filters import Condition
from prompt_toolkit.application import get_app
from .packages.parseutils.utils import is_open_quote
class PGBuffer(Buffer):
def __init__(self, always_multiline, multiline_mode, *args, **kwargs):
self.always_multiline = always_multiline
self.multiline_mode = multiline_mode
def pg_is_multiline(pgcli):
@Condition
def cond():
doc = get_app().layout.get_buffer_by_name(DEFAULT_BUFFER).document
@Condition
def is_multiline():
doc = self.document
if not self.always_multiline:
return False
if self.multiline_mode == 'safe':
return True
else:
return not _multiline_exception(doc.text)
super(self.__class__, self).__init__(*args, is_multiline=is_multiline,
tempfile_suffix='.sql', **kwargs)
if not pgcli.multi_line:
return False
if pgcli.multiline_mode == 'safe':
return True
else:
return not _multiline_exception(doc.text)
return cond
def _is_complete(sql):

View File

@ -145,32 +145,31 @@ keyring = True
# Custom colors for the completion menu, toolbar, etc.
[colors]
Token.Menu.Completions.Completion.Current = 'bg:#ffffff #000000'
Token.Menu.Completions.Completion = 'bg:#008888 #ffffff'
Token.Menu.Completions.Meta.Current = 'bg:#44aaaa #000000'
Token.Menu.Completions.Meta = 'bg:#448888 #ffffff'
Token.Menu.Completions.MultiColumnMeta = 'bg:#aaffff #000000'
Token.Menu.Completions.ProgressButton = 'bg:#003333'
Token.Menu.Completions.ProgressBar = 'bg:#00aaaa'
Token.SelectedText = '#ffffff bg:#6666aa'
Token.SearchMatch = '#ffffff bg:#4444aa'
Token.SearchMatch.Current = '#ffffff bg:#44aa44'
Token.Toolbar = 'bg:#222222 #aaaaaa'
Token.Toolbar.Off = 'bg:#222222 #888888'
Token.Toolbar.On = 'bg:#222222 #ffffff'
Token.Toolbar.Search = 'noinherit bold'
Token.Toolbar.Search.Text = 'nobold'
Token.Toolbar.System = 'noinherit bold'
Token.Toolbar.Arg = 'noinherit bold'
Token.Toolbar.Arg.Text = 'nobold'
Token.Toolbar.Transaction.Valid = 'bg:#222222 #00ff5f bold'
Token.Toolbar.Transaction.Failed = 'bg:#222222 #ff005f bold'
completion-menu.completion.current = 'bg:#ffffff #000000'
completion-menu.completion = 'bg:#008888 #ffffff'
completion-menu.meta.completion.current = 'bg:#44aaaa #000000'
completion-menu.meta.completion = 'bg:#448888 #ffffff'
completion-menu.multi-column-meta = 'bg:#aaffff #000000'
scrollbar.arrow = 'bg:#003333'
scrollbar = 'bg:#00aaaa'
selected = '#ffffff bg:#6666aa'
search = '#ffffff bg:#4444aa'
search.current = '#ffffff bg:#44aa44'
bottom-toolbar = 'bg:#222222 #aaaaaa'
bottom-toolbar.off = 'bg:#222222 #888888'
bottom-toolbar.on = 'bg:#222222 #ffffff'
search-toolbar = 'noinherit bold'
search-toolbar.text = 'nobold'
system-toolbar = 'noinherit bold'
arg-toolbar = 'noinherit bold'
arg-toolbar.text = 'nobold'
bottom-toolbar.transaction.valid = 'bg:#222222 #00ff5f bold'
bottom-toolbar.transaction.failed = 'bg:#222222 #ff005f bold'
# color of table
# you can use token or custom colors
Token.Output.Header = "#00ff5f bold"
Token.Output.OddRow = ""
Token.Output.EvenRow = ""
# style classes for colored table output
output.header = "#00ff5f bold"
output.odd-row = ""
output.even-row = ""
# Named queries are queries you can execute by name.
[named queries]

View File

@ -6,8 +6,7 @@ import operator
from collections import namedtuple, defaultdict, OrderedDict
from cli_helpers.tabular_output import TabularOutputFormatter
from pgspecial.namedqueries import NamedQueries
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit.contrib.completers import PathCompleter
from prompt_toolkit.completion import Completer, Completion, PathCompleter
from prompt_toolkit.document import Document
from .packages.sqlcompletion import (
FromClauseItem, suggest_type, Special, Database, Schema, Table,

View File

@ -1,9 +1,64 @@
from pygments.token import string_to_tokentype
from pygments.util import ClassNotFound
from prompt_toolkit.styles import PygmentsStyle
import pygments.styles
from __future__ import unicode_literals
import logging
import pygments.styles
from pygments.token import string_to_tokentype, Token
from pygments.style import Style as PygmentsStyle
from pygments.util import ClassNotFound
from prompt_toolkit.styles.pygments import style_from_pygments_cls
from prompt_toolkit.styles import merge_styles, Style
logger = logging.getLogger(__name__)
# map Pygments tokens (ptk 1.0) to class names (ptk 2.0).
TOKEN_TO_PROMPT_STYLE = {
Token.Menu.Completions.Completion.Current: 'completion-menu.completion.current',
Token.Menu.Completions.Completion: 'completion-menu.completion',
Token.Menu.Completions.Meta.Current: 'completion-menu.meta.completion.current',
Token.Menu.Completions.Meta: 'completion-menu.meta.completion',
Token.Menu.Completions.MultiColumnMeta: 'completion-menu.multi-column-meta',
Token.Menu.Completions.ProgressButton: 'scrollbar.arrow', # best guess
Token.Menu.Completions.ProgressBar: 'scrollbar', # best guess
Token.SelectedText: 'selected',
Token.SearchMatch: 'search',
Token.SearchMatch.Current: 'search.current',
Token.Toolbar: 'bottom-toolbar',
Token.Toolbar.Off: 'bottom-toolbar.off',
Token.Toolbar.On: 'bottom-toolbar.on',
Token.Toolbar.Search: 'search-toolbar',
Token.Toolbar.Search.Text: 'search-toolbar.text',
Token.Toolbar.System: 'system-toolbar',
Token.Toolbar.Arg: 'arg-toolbar',
Token.Toolbar.Arg.Text: 'arg-toolbar.text',
Token.Toolbar.Transaction.Valid: 'bottom-toolbar.transaction.valid',
Token.Toolbar.Transaction.Failed: 'bottom-toolbar.transaction.failed',
Token.Output.Header: 'output.header',
Token.Output.OddRow: 'output.odd-row',
Token.Output.EvenRow: 'output.even-row',
}
# reverse dict for cli_helpers, because they still expect Pygments tokens.
PROMPT_STYLE_TO_TOKEN = {
v: k for k, v in TOKEN_TO_PROMPT_STYLE.items()
}
def parse_pygments_style(token_name, style_object, style_dict):
"""Parse token type and style string.
:param token_name: str name of Pygments token. Example: "Token.String"
:param style_object: pygments.style.Style instance to use as base
:param style_dict: dict of token names and their styles, customized to this cli
"""
token_type = string_to_tokentype(token_name)
try:
other_token_type = string_to_tokentype(style_dict[token_name])
return token_type, style_object.styles[other_token_type]
except AttributeError as err:
return token_type, style_dict[token_name]
from pygments.style import Style
def style_factory(name, cli_style):
try:
@ -11,16 +66,31 @@ def style_factory(name, cli_style):
except ClassNotFound:
style = pygments.styles.get_style_by_name('native')
custom_styles = {}
prompt_styles = []
# prompt-toolkit used pygments tokens for styling before, switched to style
# names in 2.0. Convert old token types to new style names, for backwards compatibility.
for token in cli_style:
try:
custom_styles[string_to_tokentype(
token)] = style.styles[string_to_tokentype(cli_style[token])]
except AttributeError as err:
custom_styles[string_to_tokentype(token)] = cli_style[token]
if token.startswith('Token.'):
# treat as pygments token (1.0)
token_type, style_value = parse_pygments_style(
token, style, cli_style)
if token_type in TOKEN_TO_PROMPT_STYLE:
prompt_style = TOKEN_TO_PROMPT_STYLE[token_type]
prompt_styles.append((prompt_style, style_value))
else:
# we don't want to support tokens anymore
logger.error('Unhandled style / class name: %s', token)
else:
# treat as prompt style name (2.0). See default style names here:
# https://github.com/jonathanslenders/python-prompt-toolkit/blob/master/prompt_toolkit/styles/defaults.py
prompt_styles.append((token, cli_style[token]))
return PygmentsStyle.from_defaults(style_dict=custom_styles,
pygments_style_cls=style)
override_style = Style([('bottom-toolbar', 'noreverse')])
return merge_styles([
style_from_pygments_cls(style),
override_style,
Style(prompt_styles)
])
def style_factory_output(name, cli_style):
@ -30,14 +100,18 @@ def style_factory_output(name, cli_style):
style = pygments.styles.get_style_by_name('native').styles
for token in cli_style:
try:
style.update({string_to_tokentype(
token): style[string_to_tokentype(cli_style[token])], })
except AttributeError as err:
style.update(
{string_to_tokentype(token): cli_style[token], })
if token.startswith('Token.'):
token_type, style_value = parse_pygments_style(
token, style, cli_style)
style.update({token_type: style_value})
elif token in PROMPT_STYLE_TO_TOKEN:
token_type = PROMPT_STYLE_TO_TOKEN[token]
style.update({token_type: cli_style[token]})
else:
# TODO: cli helpers will have to switch to ptk.Style
logger.error('Unhandled style / class name: %s', token)
class OutputStyle(pygments.style.Style):
class OutputStyle(PygmentsStyle):
default_style = ""
styles = style

View File

@ -1,59 +1,62 @@
from pygments.token import Token
from prompt_toolkit.enums import DEFAULT_BUFFER
from __future__ import unicode_literals
from prompt_toolkit.key_binding.vi_state import InputMode
from prompt_toolkit.application import get_app
def _get_vi_mode(cli):
def _get_vi_mode():
return {
InputMode.INSERT: 'I',
InputMode.NAVIGATION: 'N',
InputMode.REPLACE: 'R',
InputMode.INSERT_MULTIPLE: 'M',
}[cli.vi_state.input_mode]
}[get_app().vi_state.input_mode]
def create_toolbar_tokens_func(get_vi_mode_enabled, get_is_refreshing,
failed_transaction, valid_transaction):
"""
Return a function that generates the toolbar tokens.
"""
assert callable(get_vi_mode_enabled)
token = Token.Toolbar
def get_toolbar_tokens(cli):
def create_toolbar_tokens_func(pgcli):
"""Return a function that generates the toolbar tokens."""
def get_toolbar_tokens():
result = []
result.append((token, ' '))
result.append(('class:bottom-toolbar', ' '))
if cli.buffers[DEFAULT_BUFFER].completer.smart_completion:
result.append((token.On, '[F2] Smart Completion: ON '))
if pgcli.completer.smart_completion:
result.append(('class:bottom-toolbar.on',
'[F2] Smart Completion: ON '))
else:
result.append((token.Off, '[F2] Smart Completion: OFF '))
result.append(('class:bottom-toolbar.off',
'[F2] Smart Completion: OFF '))
if cli.buffers[DEFAULT_BUFFER].always_multiline:
result.append((token.On, '[F3] Multiline: ON '))
if pgcli.multi_line:
result.append(('class:bottom-toolbar.on', '[F3] Multiline: ON '))
else:
result.append((token.Off, '[F3] Multiline: OFF '))
result.append(('class:bottom-toolbar.off',
'[F3] Multiline: OFF '))
if cli.buffers[DEFAULT_BUFFER].always_multiline:
if cli.buffers[DEFAULT_BUFFER].multiline_mode == 'safe':
result.append((token,' ([Esc] [Enter] to execute]) '))
if pgcli.multi_line:
if pgcli.multiline_mode == 'safe':
result.append(
('class:bottom-toolbar', ' ([Esc] [Enter] to execute]) '))
else:
result.append((token,' (Semi-colon [;] will end the line) '))
result.append(
('class:bottom-toolbar', ' (Semi-colon [;] will end the line) '))
if get_vi_mode_enabled():
result.append((token.On, '[F4] Vi-mode (' + _get_vi_mode(cli) + ')'))
if pgcli.vi_mode:
result.append(
('class:bottom-toolbar', '[F4] Vi-mode (' + _get_vi_mode() + ')'))
else:
result.append((token.On, '[F4] Emacs-mode'))
result.append(('class:bottom-toolbar', '[F4] Emacs-mode'))
if failed_transaction():
result.append((token.Transaction.Failed, ' Failed transaction'))
if pgcli.pgexecute.failed_transaction():
result.append(('class:bottom-toolbar.transaction.failed',
' Failed transaction'))
if valid_transaction():
result.append((token.Transaction.Valid, ' Transaction'))
if pgcli.pgexecute.valid_transaction():
result.append(
('class:bottom-toolbar.transaction.valid', ' Transaction'))
if get_is_refreshing():
result.append((token, ' Refreshing completions...'))
if pgcli.completion_refresher.is_refreshing():
result.append(
('class:bottom-toolbar', ' Refreshing completions...'))
return result
return get_toolbar_tokens

View File

@ -15,7 +15,7 @@ install_requirements = [
'pgspecial>=1.11.2',
'click >= 4.1',
'Pygments >= 2.0', # Pygments has to be Capitalcased. WTF?
'prompt_toolkit>=1.0.10,<1.1.0',
'prompt_toolkit>=2.0.0,<2.1.0',
'psycopg2 >= 2.7.4,<2.8',
'sqlparse >=0.2.2,<0.3.0',
'configobj >= 5.0.6',

View File

@ -169,7 +169,7 @@ def after_scenario(context, _):
context.tmpfile_sql_help = None
# TODO: uncomment to debug a failure
# # TODO: uncomment to debug a failure
# def after_step(context, step):
# if step.status == "failed":
# import ipdb; ipdb.set_trace()
# import pdb; pdb.set_trace()

View File

@ -72,7 +72,7 @@ def step_see_prompt(context):
@then('we see help output')
def step_see_help(context):
for expected_line in context.fixture_data['help_commands.txt']:
wrappers.expect_exact(context, expected_line, timeout=1)
wrappers.expect_exact(context, expected_line, timeout=2)
@then('we see database created')

View File

@ -39,7 +39,7 @@ def step_see_named_query_saved(context):
"""
Wait to see query saved.
"""
wrappers.expect_pager(context, 'Saved.\r\n', timeout=1)
wrappers.expect_exact(context, 'Saved.', timeout=2)
@then('we see the named query executed')

View File

@ -1,3 +1,5 @@
from __future__ import unicode_literals
from functools import partial
from itertools import product
from pgcli.packages.parseutils.meta import FunctionMetadata, ForeignKey

View File

@ -182,9 +182,9 @@ def pset_pager_mocks():
cli.watch_command = None
with mock.patch('pgcli.main.click.echo') as mock_echo, \
mock.patch('pgcli.main.click.echo_via_pager') as mock_echo_via_pager, \
mock.patch.object(cli, 'cli') as mock_cli:
mock.patch.object(cli, 'prompt_app') as mock_app:
yield cli, mock_echo, mock_echo_via_pager, mock_cli
yield cli, mock_echo, mock_echo_via_pager, mock_app
@pytest.mark.parametrize('term_height,term_width,text', test_data, ids=test_ids)