1
0
Fork 0

Add verbose_errors config and special command (#1455)

* Add verbose_errors config

* Update changelog

* Add special command

* Blackify
This commit is contained in:
Andrew 2024-03-27 05:31:38 +07:00 committed by GitHub
parent a7a70fdc92
commit 8cc22b9d3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 144 additions and 6 deletions

View File

@ -4,7 +4,9 @@ Upcoming
Features:
---------
* Support `PGAPPNAME` as an environment variable and `--application-name` as a command line argument.
* Show Postgres notifications
* Add `verbose_errors` config and `\v` special command which enable the
displaying of all Postgres error fields received.
* Show Postgres notifications.
Bug fixes:
----------

View File

@ -75,6 +75,7 @@ from getpass import getuser
from psycopg import OperationalError, InterfaceError, Notify
from psycopg.conninfo import make_conninfo, conninfo_to_dict
from psycopg.errors import Diagnostic
from collections import namedtuple
@ -248,6 +249,9 @@ class PGCli:
)
self.less_chatty = bool(less_chatty) or c["main"].as_bool("less_chatty")
self.verbose_errors = "verbose_errors" in c["main"] and c["main"].as_bool(
"verbose_errors"
)
self.null_string = c["main"].get("null_string", "<null>")
self.prompt_format = (
prompt
@ -389,6 +393,26 @@ class PGCli:
"Echo a string to the query output channel.",
)
self.pgspecial.register(
self.toggle_verbose_errors,
"\\v",
"\\v [on|off]",
"Toggle verbose errors.",
)
def toggle_verbose_errors(self, pattern, **_):
flag = pattern.strip()
if flag == "on":
self.verbose_errors = True
elif flag == "off":
self.verbose_errors = False
else:
self.verbose_errors = not self.verbose_errors
message = "Verbose errors " + "on." if self.verbose_errors else "off."
return [(None, None, None, message)]
def echo(self, pattern, **_):
return [(None, None, None, pattern)]
@ -1080,7 +1104,7 @@ class PGCli:
res = self.pgexecute.run(
text,
self.pgspecial,
exception_formatter,
lambda x: exception_formatter(x, self.verbose_errors),
on_error_resume,
explain_mode=self.explain_mode,
)
@ -1618,8 +1642,71 @@ def is_select(status):
return status.split(None, 1)[0].lower() == "select"
def exception_formatter(e):
return click.style(str(e), fg="red")
def diagnostic_output(diagnostic: Diagnostic) -> str:
fields = []
if diagnostic.severity is not None:
fields.append("Severity: " + diagnostic.severity)
if diagnostic.severity_nonlocalized is not None:
fields.append("Severity (non-localized): " + diagnostic.severity_nonlocalized)
if diagnostic.sqlstate is not None:
fields.append("SQLSTATE code: " + diagnostic.sqlstate)
if diagnostic.message_primary is not None:
fields.append("Message: " + diagnostic.message_primary)
if diagnostic.message_detail is not None:
fields.append("Detail: " + diagnostic.message_detail)
if diagnostic.message_hint is not None:
fields.append("Hint: " + diagnostic.message_hint)
if diagnostic.statement_position is not None:
fields.append("Position: " + diagnostic.statement_position)
if diagnostic.internal_position is not None:
fields.append("Internal position: " + diagnostic.internal_position)
if diagnostic.internal_query is not None:
fields.append("Internal query: " + diagnostic.internal_query)
if diagnostic.context is not None:
fields.append("Where: " + diagnostic.context)
if diagnostic.schema_name is not None:
fields.append("Schema name: " + diagnostic.schema_name)
if diagnostic.table_name is not None:
fields.append("Table name: " + diagnostic.table_name)
if diagnostic.column_name is not None:
fields.append("Column name: " + diagnostic.column_name)
if diagnostic.datatype_name is not None:
fields.append("Data type name: " + diagnostic.datatype_name)
if diagnostic.constraint_name is not None:
fields.append("Constraint name: " + diagnostic.constraint_name)
if diagnostic.source_file is not None:
fields.append("File: " + diagnostic.source_file)
if diagnostic.source_line is not None:
fields.append("Line: " + diagnostic.source_line)
if diagnostic.source_function is not None:
fields.append("Routine: " + diagnostic.source_function)
return "\n".join(fields)
def exception_formatter(e, verbose_errors: bool = False):
s = str(e)
if verbose_errors:
s += "\n" + diagnostic_output(e.diag)
return click.style(s, fg="red")
def format_output(title, cur, headers, status, settings, explain_mode=False):

View File

@ -156,6 +156,11 @@ max_field_width = 500
# Skip intro on startup and goodbye on exit
less_chatty = False
# Show all Postgres error fields (as listed in
# https://www.postgresql.org/docs/current/protocol-error-fields.html).
# Can be toggled with \v.
verbose_errors = False
# Postgres prompt
# \t - Current date and time
# \u - Username

View File

@ -299,6 +299,24 @@ def test_i_works(tmpdir, executor):
run(executor, statement, pgspecial=cli.pgspecial)
@dbtest
def test_toggle_verbose_errors(executor):
cli = PGCli(pgexecute=executor)
cli._evaluate_command("\\v on")
assert cli.verbose_errors
output, _ = cli._evaluate_command("SELECT 1/0")
assert "SQLSTATE" in output[0]
cli._evaluate_command("\\v off")
assert not cli.verbose_errors
output, _ = cli._evaluate_command("SELECT 1/0")
assert "SQLSTATE" not in output[0]
cli._evaluate_command("\\v")
assert cli.verbose_errors
@dbtest
def test_echo_works(executor):
cli = PGCli(pgexecute=executor)

View File

@ -1,3 +1,4 @@
import re
from textwrap import dedent
import psycopg
@ -6,7 +7,7 @@ from unittest.mock import patch, MagicMock
from pgspecial.main import PGSpecial, NO_QUERY
from utils import run, dbtest, requires_json, requires_jsonb
from pgcli.main import PGCli
from pgcli.main import PGCli, exception_formatter as main_exception_formatter
from pgcli.packages.parseutils.meta import FunctionMetadata
@ -219,8 +220,33 @@ def test_database_list(executor):
@dbtest
def test_invalid_syntax(executor, exception_formatter):
result = run(executor, "invalid syntax!", exception_formatter=exception_formatter)
result = run(
executor,
"invalid syntax!",
exception_formatter=lambda x: main_exception_formatter(x, verbose_errors=False),
)
assert 'syntax error at or near "invalid"' in result[0]
assert "SQLSTATE" not in result[0]
@dbtest
def test_invalid_syntax_verbose(executor):
result = run(
executor,
"invalid syntax!",
exception_formatter=lambda x: main_exception_formatter(x, verbose_errors=True),
)
fields = r"""
Severity: ERROR
Severity \(non-localized\): ERROR
SQLSTATE code: 42601
Message: syntax error at or near "invalid"
Position: 1
File: scan\.l
Line: \d+
Routine: scanner_yyerror
""".strip()
assert re.search(fields, result[0])
@dbtest