mirror of https://github.com/dbcli/pgcli
Re-run last query with bare `\watch` (#1283)
* Re-run last query with bare `\watch` * add test * clean up post test refactor * lint * rerun tests
This commit is contained in:
parent
c65495716d
commit
123e00a086
|
@ -5,6 +5,7 @@ Features:
|
|||
---------
|
||||
|
||||
* Add `max_field_width` setting to config, to enable more control over field truncation ([related issue](https://github.com/dbcli/pgcli/issues/1250)).
|
||||
* Re-run last query via bare `\watch`. (Thanks: `Saif Hakim`_)
|
||||
|
||||
Bug fixes:
|
||||
----------
|
||||
|
|
|
@ -772,18 +772,7 @@ class PGCli:
|
|||
click.secho(str(e), err=True, fg="red")
|
||||
continue
|
||||
|
||||
# Initialize default metaquery in case execution fails
|
||||
self.watch_command, timing = special.get_watch_command(text)
|
||||
if self.watch_command:
|
||||
while self.watch_command:
|
||||
try:
|
||||
query = self.execute_command(self.watch_command)
|
||||
click.echo(f"Waiting for {timing} seconds before repeating")
|
||||
sleep(timing)
|
||||
except KeyboardInterrupt:
|
||||
self.watch_command = None
|
||||
else:
|
||||
query = self.execute_command(text)
|
||||
self.handle_watch_command(text)
|
||||
|
||||
self.now = dt.datetime.today()
|
||||
|
||||
|
@ -791,12 +780,40 @@ class PGCli:
|
|||
with self._completer_lock:
|
||||
self.completer.extend_query_history(text)
|
||||
|
||||
self.query_history.append(query)
|
||||
|
||||
except (PgCliQuitError, EOFError):
|
||||
if not self.less_chatty:
|
||||
print("Goodbye!")
|
||||
|
||||
def handle_watch_command(self, text):
|
||||
# Initialize default metaquery in case execution fails
|
||||
self.watch_command, timing = special.get_watch_command(text)
|
||||
|
||||
# If we run \watch without a command, apply it to the last query run.
|
||||
if self.watch_command is not None and not self.watch_command.strip():
|
||||
try:
|
||||
self.watch_command = self.query_history[-1].query
|
||||
except IndexError:
|
||||
click.secho(
|
||||
"\\watch cannot be used with an empty query", err=True, fg="red"
|
||||
)
|
||||
self.watch_command = None
|
||||
|
||||
# If there's a command to \watch, run it in a loop.
|
||||
if self.watch_command:
|
||||
while self.watch_command:
|
||||
try:
|
||||
query = self.execute_command(self.watch_command)
|
||||
click.echo(f"Waiting for {timing} seconds before repeating")
|
||||
sleep(timing)
|
||||
except KeyboardInterrupt:
|
||||
self.watch_command = None
|
||||
|
||||
# Otherwise, execute it as a regular command.
|
||||
else:
|
||||
query = self.execute_command(text)
|
||||
|
||||
self.query_history.append(query)
|
||||
|
||||
def _build_cli(self, history):
|
||||
key_bindings = pgcli_bindings(self)
|
||||
|
||||
|
|
|
@ -297,6 +297,63 @@ def test_i_works(tmpdir, executor):
|
|||
run(executor, statement, pgspecial=cli.pgspecial)
|
||||
|
||||
|
||||
@dbtest
|
||||
def test_watch_works(executor):
|
||||
cli = PGCli(pgexecute=executor)
|
||||
|
||||
def run_with_watch(
|
||||
query, target_call_count=1, expected_output="", expected_timing=None
|
||||
):
|
||||
"""
|
||||
:param query: Input to the CLI
|
||||
:param target_call_count: Number of times the user lets the command run before Ctrl-C
|
||||
:param expected_output: Substring expected to be found for each executed query
|
||||
:param expected_timing: value `time.sleep` expected to be called with on every invocation
|
||||
"""
|
||||
with mock.patch.object(cli, "echo_via_pager") as mock_echo, mock.patch(
|
||||
"pgcli.main.sleep"
|
||||
) as mock_sleep:
|
||||
mock_sleep.side_effect = [None] * (target_call_count - 1) + [
|
||||
KeyboardInterrupt
|
||||
]
|
||||
cli.handle_watch_command(query)
|
||||
# Validate that sleep was called with the right timing
|
||||
for i in range(target_call_count - 1):
|
||||
assert mock_sleep.call_args_list[i][0][0] == expected_timing
|
||||
# Validate that the output of the query was expected
|
||||
assert mock_echo.call_count == target_call_count
|
||||
for i in range(target_call_count):
|
||||
assert expected_output in mock_echo.call_args_list[i][0][0]
|
||||
|
||||
# With no history, it errors.
|
||||
with mock.patch("pgcli.main.click.secho") as mock_secho:
|
||||
cli.handle_watch_command(r"\watch 2")
|
||||
mock_secho.assert_called()
|
||||
assert (
|
||||
r"\watch cannot be used with an empty query"
|
||||
in mock_secho.call_args_list[0][0][0]
|
||||
)
|
||||
|
||||
# Usage 1: Run a query and then re-run it with \watch across two prompts.
|
||||
run_with_watch("SELECT 111", expected_output="111")
|
||||
run_with_watch(
|
||||
"\\watch 10", target_call_count=2, expected_output="111", expected_timing=10
|
||||
)
|
||||
|
||||
# Usage 2: Run a query and \watch via the same prompt.
|
||||
run_with_watch(
|
||||
"SELECT 222; \\watch 4",
|
||||
target_call_count=3,
|
||||
expected_output="222",
|
||||
expected_timing=4,
|
||||
)
|
||||
|
||||
# Usage 3: Re-run the last watched command with a new timing
|
||||
run_with_watch(
|
||||
"\\watch 5", target_call_count=4, expected_output="222", expected_timing=5
|
||||
)
|
||||
|
||||
|
||||
def test_missing_rc_dir(tmpdir):
|
||||
rcfile = str(tmpdir.join("subdir").join("rcfile"))
|
||||
|
||||
|
|
Loading…
Reference in New Issue