1
0
Fork 0

skip initial comment in pg_session file (#1245)

* skip initial comment in pg_session file

* add test
This commit is contained in:
Georgy Frolov 2021-02-23 01:55:55 +03:00 committed by GitHub
parent 30212c6fc6
commit c9fd72449e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 58 additions and 5 deletions

View File

@ -6,6 +6,7 @@ Features:
* Consider `update` queries destructive and issue a warning. Change
`destructive_warning` setting to `all|moderate|off`, vs `true|false`. (#1239)
* Skip initial comment in .pg_session even if it doesn't start with '#'
Bug fixes:
----------

View File

@ -3,6 +3,8 @@ import shutil
import os
import platform
from os.path import expanduser, exists, dirname
import re
from typing import TextIO
from configobj import ConfigObj
@ -62,3 +64,28 @@ def get_casing_file(config):
if casing_file == "default":
casing_file = config_location() + "casing"
return casing_file
def skip_initial_comment(f_stream: TextIO) -> int:
"""
Initial comment in ~/.pg_service.conf is not always marked with '#'
which crashes the parser. This function takes a file object and
"rewinds" it to the beginning of the first section,
from where on it can be parsed safely
:return: number of skipped lines
"""
section_regex = r"\s*\["
pos = f_stream.tell()
lines_skipped = 0
while True:
line = f_stream.readline()
if line == "":
break
if re.match(section_regex, line) is not None:
f_stream.seek(pos)
break
else:
pos += len(line)
lines_skipped += 1
return lines_skipped

View File

@ -2,8 +2,9 @@ import platform
import warnings
from os.path import expanduser
from configobj import ConfigObj
from configobj import ConfigObj, ParseError
from pgspecial.namedqueries import NamedQueries
from .config import skip_initial_comment
warnings.filterwarnings("ignore", category=UserWarning, module="psycopg2")
@ -1508,10 +1509,16 @@ def parse_service_info(service):
service_file = os.path.join(os.getenv("PGSYSCONFDIR"), ".pg_service.conf")
else:
service_file = expanduser("~/.pg_service.conf")
if not service:
if not service or not os.path.exists(service_file):
# nothing to do
return None, service_file
service_file_config = ConfigObj(service_file)
with open(service_file) as f:
skipped_lines = skip_initial_comment(f)
try:
service_file_config = ConfigObj(f)
except ParseError as err:
err.line_number += skipped_lines
raise err
if service not in service_file_config:
return None, service_file
service_conf = service_file_config.get(service)

View File

@ -1,9 +1,10 @@
import io
import os
import stat
import pytest
from pgcli.config import ensure_dir_exists
from pgcli.config import ensure_dir_exists, skip_initial_comment
def test_ensure_file_parent(tmpdir):
@ -28,3 +29,15 @@ def test_ensure_other_create_error(tmpdir):
with pytest.raises(OSError):
ensure_dir_exists(str(rcfile))
@pytest.mark.parametrize(
"text, skipped_lines",
(
("abc\n", 1),
("#[section]\ndef\n[section]", 2),
("[section]", 0),
),
)
def test_skip_initial_comment(text, skipped_lines):
assert skip_initial_comment(io.StringIO(text)) == skipped_lines

View File

@ -288,7 +288,12 @@ def test_pg_service_file(tmpdir):
cli = PGCli(pgclirc_file=str(tmpdir.join("rcfile")))
with open(tmpdir.join(".pg_service.conf").strpath, "w") as service_conf:
service_conf.write(
"""[myservice]
"""File begins with a comment
that is not a comment
# or maybe a comment after all
because psql is crazy
[myservice]
host=a_host
user=a_user
port=5433