1
0
Fork 0

Add --log-file & \log-file option to always capture output

Currently outputting to a file via \o disables the console output
This patch adds a `--log-file` cli arg with similar behavior to `psql`* and a \log-file
special command to enable/disable it from the console

*https://www.postgresql.org/docs/current/app-psql.html#APP-PSQL-OPTION-LOG-FILE
This commit is contained in:
saucoide 2024-04-07 12:26:11 +02:00
parent c6c5f041c0
commit a61cb56bdd
2 changed files with 80 additions and 1 deletions

View File

@ -13,6 +13,7 @@ import shutil
import functools
import datetime as dt
import itertools
import pathlib
import platform
from time import time, sleep
from typing import Optional
@ -182,6 +183,7 @@ class PGCli:
auto_vertical_output=False,
warn=None,
ssh_tunnel_url: Optional[str] = None,
log_file: Optional[str] = None,
):
self.force_passwd_prompt = force_passwd_prompt
self.never_passwd_prompt = never_passwd_prompt
@ -310,6 +312,10 @@ class PGCli:
self.ssh_tunnel_url = ssh_tunnel_url
self.ssh_tunnel = None
if log_file:
open(log_file, "a+").close() # ensure writeable
self.log_file = log_file
# formatter setup
self.formatter = TabularOutputFormatter(format_name=c["main"]["table_format"])
register_new_formatter(self.formatter)
@ -369,6 +375,12 @@ class PGCli:
"\\o [filename]",
"Send all query results to file.",
)
self.pgspecial.register(
self.write_to_logfile,
"\\log-file",
"\\log-file [filename]",
"Log all query results to a logfile, in addition to the normal output destination.",
)
self.pgspecial.register(
self.info_connection, "\\conninfo", "\\conninfo", "Get connection details"
)
@ -508,6 +520,25 @@ class PGCli:
explain_mode=self.explain_mode,
)
def write_to_logfile(self, pattern, **_):
if not pattern:
self.log_file = None
message = "Logfile capture disabled"
return [(None, None, None, message, "", True, True)]
log_file = pathlib.Path(pattern).expanduser().absolute()
try:
open(log_file, "a+").close() # ensure writeable
except OSError as e:
self.log_file = None
message = str(e) + "\nLogfile capture disabled"
return [(None, None, None, message, "", False, True)]
self.log_file = str(log_file)
message = 'Writing to file "%s"' % self.log_file
return [(None, None, None, message, "", True, True)]
def write_to_file(self, pattern, **_):
if not pattern:
self.output_file = None
@ -826,7 +857,7 @@ class PGCli:
else:
try:
if self.output_file and not text.startswith(
("\\o ", "\\? ", "\\echo ")
("\\o ", "\\log-file", "\\? ", "\\echo ")
):
try:
with open(self.output_file, "a", encoding="utf-8") as f:
@ -838,6 +869,21 @@ class PGCli:
else:
if output:
self.echo_via_pager("\n".join(output))
# Log to file in addition to normal output
if (
self.log_file
and not text.startswith(("\\o ", "\\log-file", "\\? ", "\\echo "))
and not text.strip() == ""
):
try:
with open(self.log_file, "a", encoding="utf-8") as f:
click.echo(dt.datetime.now(), file=f) # timestamp log
click.echo(text, file=f)
click.echo("\n".join(output), file=f)
click.echo("", file=f) # extra newline
except OSError as e:
click.secho(str(e), err=True, fg="red")
except KeyboardInterrupt:
pass
@ -1428,6 +1474,11 @@ class PGCli:
default=None,
help="Open an SSH tunnel to the given address and connect to the database from it.",
)
@click.option(
"--log-file",
default=None,
help="Write all queries & output into a file, in addition to the normal output destination.",
)
@click.argument("dbname", default=lambda: None, envvar="PGDATABASE", nargs=1)
@click.argument("username", default=lambda: None, envvar="PGUSER", nargs=1)
def cli(
@ -1453,6 +1504,7 @@ def cli(
list_dsn,
warn,
ssh_tunnel: str,
log_file: str,
):
if version:
print("Version:", __version__)
@ -1511,6 +1563,7 @@ def cli(
auto_vertical_output=auto_vertical_output,
warn=warn,
ssh_tunnel_url=ssh_tunnel,
log_file=log_file,
)
# Choose which ever one has a valid value.

View File

@ -1,6 +1,8 @@
import os
import platform
import re
import tempfile
import datetime
from unittest import mock
import pytest
@ -333,6 +335,30 @@ def test_qecho_works(executor):
assert result == ["asdf"]
@dbtest
def test_logfile_works(executor):
with tempfile.TemporaryDirectory() as tmpdir:
log_file = f"{tmpdir}/tempfile.log"
cli = PGCli(pgexecute=executor, log_file=log_file)
statement = r"\qecho hello!"
cli.execute_command(statement)
with open(log_file, "r") as f:
log_contents = f.readlines()
assert datetime.datetime.fromisoformat(log_contents[0].strip())
assert log_contents[1].strip() == r"\qecho hello!"
assert log_contents[2].strip() == "hello!"
@dbtest
def test_logfile_unwriteable_file(executor):
cli = PGCli(pgexecute=executor)
statement = r"\log-file /etc/forbidden.log"
result = run(executor, statement, pgspecial=cli.pgspecial)
assert result == [
"[Errno 13] Permission denied: '/etc/forbidden.log'\nLogfile capture disabled"
]
@dbtest
def test_watch_works(executor):
cli = PGCli(pgexecute=executor)