1
0
Fork 0

Changed destructive_warning to take a list of destructive commands (#1328)

* Changed destructive_warning to take a list of destructive commands and added the dsn_alias as part of the destructive command warning

* Updated parse_destructive_warning to handle None

* Reverted auto formatted change to AUTHORS

* Reverted auto formatted change to AUTHORS
This commit is contained in:
Rodrigo Neri (Rigo) 2022-10-13 16:42:22 -05:00 committed by GitHub
parent c280f8e398
commit 1726ff5397
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 129 additions and 51 deletions

View File

@ -123,6 +123,7 @@ Contributors:
* Daniel Kukula (dkuku)
* Kian-Meng Ang (kianmeng)
* Liu Zhao (astroshot)
* Rigo Neri (rigoneri)
Creator:
--------

View File

@ -165,8 +165,9 @@ in the ``tests`` directory. An example::
First, install the requirements for testing:
::
$ pip install -r requirements-dev.txt
$ pip install -U pip setuptools
$ pip install --no-cache-dir ".[sshtunnel]"
$ pip install -r requirements-dev.txt
Ensure that the database user has permissions to create and drop test databases
by checking your ``pg_hba.conf`` file. The default user should be ``postgres``

View File

@ -1,3 +1,13 @@
Upcoming
========
Features:
---------
* Changed the `destructive_warning` config to be a list of commands that are considered
destructive. This would allow you to be warned on `create`, `grant`, or `insert` queries.
* Destructive warnings will now include the alias dsn connection string name if provided (-D option).
3.5.0 (2022/09/15):
===================

View File

@ -64,6 +64,7 @@ from .config import (
from .key_bindings import pgcli_bindings
from .packages.formatter.sqlformatter import register_new_formatter
from .packages.prompt_utils import confirm_destructive_query
from .packages.parseutils import parse_destructive_warning
from .__init__ import __version__
click.disable_unicode_literals_warning = True
@ -224,11 +225,10 @@ class PGCli:
self.syntax_style = c["main"]["syntax_style"]
self.cli_style = c["colors"]
self.wider_completion_menu = c["main"].as_bool("wider_completion_menu")
self.destructive_warning = warn or c["main"]["destructive_warning"]
# also handle boolean format of destructive warning
self.destructive_warning = {"true": "all", "false": "off"}.get(
self.destructive_warning.lower(), self.destructive_warning
self.destructive_warning = parse_destructive_warning(
warn or c["main"].as_list("destructive_warning")
)
self.less_chatty = bool(less_chatty) or c["main"].as_bool("less_chatty")
self.null_string = c["main"].get("null_string", "<null>")
self.prompt_format = (
@ -424,8 +424,11 @@ class PGCli:
return [(None, None, None, str(e), "", False, True)]
if (
self.destructive_warning != "off"
and confirm_destructive_query(query, self.destructive_warning) is False
self.destructive_warning
and confirm_destructive_query(
query, self.destructive_warning, self.dsn_alias
)
is False
):
message = "Wise choice. Command execution stopped."
return [(None, None, None, message)]
@ -693,15 +696,16 @@ class PGCli:
query = MetaQuery(query=text, successful=False)
try:
if self.destructive_warning != "off":
if self.destructive_warning:
destroy = confirm = confirm_destructive_query(
text, self.destructive_warning
text, self.destructive_warning, self.dsn_alias
)
if destroy is False:
click.secho("Wise choice!")
raise KeyboardInterrupt
elif destroy:
click.secho("Your call!")
output, query = self._evaluate_command(text)
except KeyboardInterrupt:
# Restart connection to the database
@ -1266,7 +1270,6 @@ class PGCli:
@click.option(
"--warn",
default=None,
type=click.Choice(["all", "moderate", "off"]),
help="Warn before running a destructive query.",
)
@click.option(

View File

@ -1,6 +1,17 @@
import sqlparse
BASE_KEYWORDS = [
"drop",
"shutdown",
"delete",
"truncate",
"alter",
"unconditional_update",
]
ALL_KEYWORDS = BASE_KEYWORDS + ["update"]
def query_starts_with(formatted_sql, prefixes):
"""Check if the query starts with any item from *prefixes*."""
prefixes = [prefix.lower() for prefix in prefixes]
@ -13,22 +24,35 @@ def query_is_unconditional_update(formatted_sql):
return bool(tokens) and tokens[0] == "update" and "where" not in tokens
def query_is_simple_update(formatted_sql):
"""Check if the query starts with UPDATE."""
tokens = formatted_sql.split()
return bool(tokens) and tokens[0] == "update"
def is_destructive(queries, warning_level="all"):
def is_destructive(queries, keywords):
"""Returns if any of the queries in *queries* is destructive."""
keywords = ("drop", "shutdown", "delete", "truncate", "alter")
for query in sqlparse.split(queries):
if query:
formatted_sql = sqlparse.format(query.lower(), strip_comments=True).strip()
if "unconditional_update" in keywords and query_is_unconditional_update(
formatted_sql
):
return True
if query_starts_with(formatted_sql, keywords):
return True
if query_is_unconditional_update(formatted_sql):
return True
if warning_level == "all" and query_is_simple_update(formatted_sql):
return True
return False
def parse_destructive_warning(warning_level):
"""Converts a deprecated destructive warning option to a list of command keywords."""
if not warning_level:
return []
if not isinstance(warning_level, list):
if "," in warning_level:
return warning_level.split(",")
warning_level = [warning_level]
return {
"true": ALL_KEYWORDS,
"false": [],
"all": ALL_KEYWORDS,
"moderate": BASE_KEYWORDS,
"off": [],
"": [],
}.get(warning_level[0], warning_level)

View File

@ -3,7 +3,7 @@ import click
from .parseutils import is_destructive
def confirm_destructive_query(queries, warning_level):
def confirm_destructive_query(queries, keywords, alias):
"""Check if the query is destructive and prompts the user to confirm.
Returns:
@ -12,10 +12,12 @@ def confirm_destructive_query(queries, warning_level):
* False if the query is destructive and the user doesn't want to proceed.
"""
prompt_text = (
"You're about to run a destructive command.\n" "Do you want to proceed? (y/n)"
)
if is_destructive(queries, warning_level) and sys.stdin.isatty():
info = "You're about to run a destructive command"
if alias:
info += f" in {click.style(alias, fg='red')}"
prompt_text = f"{info}.\nDo you want to proceed? (y/n)"
if is_destructive(queries, keywords) and sys.stdin.isatty():
return prompt(prompt_text, type=bool)

View File

@ -22,14 +22,12 @@ multi_line = False
# a command.
multi_line_mode = psql
# Destructive warning mode will alert you before executing a sql statement
# Destructive warning will alert you before executing a sql statement
# that may cause harm to the database such as "drop table", "drop database",
# "shutdown", "delete", or "update".
# Possible values:
# "all" - warn on data definition statements, server actions such as SHUTDOWN, DELETE or UPDATE
# "moderate" - skip warning on UPDATE statements, except for unconditional updates
# "off" - skip all warnings
destructive_warning = all
# "shutdown", "delete", or "update".
# You can pass a list of destructive commands or leave it empty if you want to skip all warnings.
# "unconditional_update" will warn you of update statements that don't have a where clause
destructive_warning = drop, shutdown, delete, truncate, alter, update, unconditional_update
# Enables expand mode, which is similar to `\x` in psql.
expand = False
@ -140,7 +138,7 @@ less_chatty = False
# \i - Postgres PID
# \# - "@" sign if logged in as superuser, '>' in other case
# \n - Newline
# \dsn_alias - name of dsn alias if -D option is used (empty otherwise)
# \dsn_alias - name of dsn connection string alias if -D option is used (empty otherwise)
# \x1b[...m - insert ANSI escape sequence
# eg: prompt = '\x1b[35m\u@\x1b[32m\h:\x1b[36m\d>'
prompt = '\u@\h:\d> '
@ -198,7 +196,8 @@ output.null = "#808080"
# Named queries are queries you can execute by name.
[named queries]
# DSN to call by -D option
# Here's where you can provide a list of connection string aliases.
# You can use it by passing the -D option. `pgcli -D example_dsn`
[alias_dsn]
# example_dsn = postgresql://[user[:password]@][netloc][:port][/dbname]

View File

@ -1,5 +1,10 @@
import pytest
from pgcli.packages.parseutils import is_destructive
from pgcli.packages.parseutils import (
is_destructive,
parse_destructive_warning,
BASE_KEYWORDS,
ALL_KEYWORDS,
)
from pgcli.packages.parseutils.tables import extract_tables
from pgcli.packages.parseutils.utils import find_prev_keyword, is_open_quote
@ -263,18 +268,43 @@ def test_is_open_quote__open(sql):
@pytest.mark.parametrize(
("sql", "warning_level", "expected"),
("sql", "keywords", "expected"),
[
("update abc set x = 1", "all", True),
("update abc set x = 1 where y = 2", "all", True),
("update abc set x = 1", "moderate", True),
("update abc set x = 1 where y = 2", "moderate", False),
("select x, y, z from abc", "all", False),
("drop abc", "all", True),
("alter abc", "all", True),
("delete abc", "all", True),
("truncate abc", "all", True),
("update abc set x = 1", ALL_KEYWORDS, True),
("update abc set x = 1 where y = 2", ALL_KEYWORDS, True),
("update abc set x = 1", BASE_KEYWORDS, True),
("update abc set x = 1 where y = 2", BASE_KEYWORDS, False),
("select x, y, z from abc", ALL_KEYWORDS, False),
("drop abc", ALL_KEYWORDS, True),
("alter abc", ALL_KEYWORDS, True),
("delete abc", ALL_KEYWORDS, True),
("truncate abc", ALL_KEYWORDS, True),
("insert into abc values (1, 2, 3)", ALL_KEYWORDS, False),
("insert into abc values (1, 2, 3)", BASE_KEYWORDS, False),
("insert into abc values (1, 2, 3)", ["insert"], True),
("insert into abc values (1, 2, 3)", ["insert"], True),
],
)
def test_is_destructive(sql, warning_level, expected):
assert is_destructive(sql, warning_level=warning_level) == expected
def test_is_destructive(sql, keywords, expected):
assert is_destructive(sql, keywords) == expected
@pytest.mark.parametrize(
("warning_level", "expected"),
[
("true", ALL_KEYWORDS),
("false", []),
("all", ALL_KEYWORDS),
("moderate", BASE_KEYWORDS),
("off", []),
("", []),
(None, []),
(ALL_KEYWORDS, ALL_KEYWORDS),
(BASE_KEYWORDS, BASE_KEYWORDS),
("insert", ["insert"]),
("drop,alter,delete", ["drop", "alter", "delete"]),
(["drop", "alter", "delete"], ["drop", "alter", "delete"]),
],
)
def test_parse_destructive_warning(warning_level, expected):
assert parse_destructive_warning(warning_level) == expected

View File

@ -7,4 +7,11 @@ def test_confirm_destructive_query_notty():
stdin = click.get_text_stream("stdin")
if not stdin.isatty():
sql = "drop database foo;"
assert confirm_destructive_query(sql, "all") is None
assert confirm_destructive_query(sql, [], None) is None
def test_confirm_destructive_query_with_alias():
stdin = click.get_text_stream("stdin")
if not stdin.isatty():
sql = "drop database foo;"
assert confirm_destructive_query(sql, ["drop"], "test") is None

View File

@ -5,6 +5,7 @@ deps = pytest>=2.7.0,<=3.0.7
mock>=1.0.1
behave>=1.2.4
pexpect==3.3
sshtunnel>=0.4.0
commands = py.test
behave tests/features
passenv = PGHOST