1
0
Fork 0

Move format_output out of Pgcli class.

This commit is contained in:
Thomas Roten 2017-06-26 10:10:41 -05:00
parent b7c1ba89f0
commit 7ef74ae3bd
3 changed files with 54 additions and 58 deletions

View File

@ -646,8 +646,7 @@ class PGCli(object):
else lambda x: x
)
)
formatted = self.format_output(title, cur, headers, status,
settings)
formatted = format_output(title, cur, headers, status, settings)
output.extend(formatted)
total = time() - start
@ -759,49 +758,6 @@ class PGCli(object):
"""Get the last query executed or None."""
return self.query_history[-1][0] if self.query_history else None
def format_output(self, title, cur, headers, status, settings):
output = []
expanded = (settings.expanded or settings.table_format == 'vertical')
table_format = ('vertical' if settings.expanded else
settings.table_format)
max_width = settings.max_width
case_function = settings.case_function
formatter = TabularOutputFormatter(format_name=table_format)
output_kwargs = {
'sep_title': 'RECORD {n}',
'sep_character': '-',
'sep_length': (1, 25),
'missing_value': settings.missingval,
'integer_format': settings.dcmlfmt,
'float_format': settings.floatfmt,
'preprocessors': (format_numbers, ),
'disable_numparse': True,
'preserve_whitespace': True
}
if not settings.dcmlfmt and not settings.floatfmt:
output_kwargs['preprocessors'] = (align_decimals, )
if title: # Only print the title if it's not None.
output.append(title)
if cur:
headers = [case_function(utf8tounicode(x)) for x in headers]
rows = list(cur)
formatted = formatter.format_output(rows, headers, **output_kwargs)
if (not expanded and max_width and
content_exceeds_width(rows[0], max_width) and headers):
formatted = formatter.format_output(
rows, headers, format_name='vertical', **output_kwargs)
output.append(formatted)
if status: # Only print the status if it's not None.
output.append(status)
return output
@click.command()
# Default host is '' so psycopg2 can default to either localhost or unix socket
@ -964,5 +920,49 @@ def exception_formatter(e):
return click.style(utf8tounicode(str(e)), fg='red')
def format_output(title, cur, headers, status, settings):
output = []
expanded = (settings.expanded or settings.table_format == 'vertical')
table_format = ('vertical' if settings.expanded else
settings.table_format)
max_width = settings.max_width
case_function = settings.case_function
formatter = TabularOutputFormatter(format_name=table_format)
output_kwargs = {
'sep_title': 'RECORD {n}',
'sep_character': '-',
'sep_length': (1, 25),
'missing_value': settings.missingval,
'integer_format': settings.dcmlfmt,
'float_format': settings.floatfmt,
'preprocessors': (format_numbers, ),
'disable_numparse': True,
'preserve_whitespace': True
}
if not settings.dcmlfmt and not settings.floatfmt:
output_kwargs['preprocessors'] = (align_decimals, )
if title: # Only print the title if it's not None.
output.append(title)
if cur:
headers = [case_function(utf8tounicode(x)) for x in headers]
rows = list(cur)
formatted = formatter.format_output(rows, headers, **output_kwargs)
if (not expanded and max_width and
content_exceeds_width(rows[0], max_width) and headers):
formatted = formatter.format_output(
rows, headers, format_name='vertical', **output_kwargs)
output.append(formatted)
if status: # Only print the status if it's not None.
output.append(status)
return output
if __name__ == "__main__":
cli()

View File

@ -9,7 +9,7 @@ except ImportError:
setproctitle = None
from pgcli.main import (
obfuscate_process_password, PGCli, OutputSettings
obfuscate_process_password, format_output, PGCli, OutputSettings
)
from utils import dbtest, run
@ -49,24 +49,22 @@ def test_obfuscate_process_password():
def test_format_output():
pgcli = PGCli()
settings = OutputSettings(table_format='psql', dcmlfmt='d', floatfmt='g')
results = pgcli.format_output('Title', [('abc', 'def')], ['head1', 'head2'],
'test status', settings)
results = format_output('Title', [('abc', 'def')], ['head1', 'head2'],
'test status', settings)
expected = ['Title', '+---------+---------+\n| head1 | head2 |\n|---------+---------|\n| abc | def |\n+---------+---------+', 'test status']
assert results == expected
def test_format_output_auto_expand():
pgcli = PGCli()
settings = OutputSettings(
table_format='psql', dcmlfmt='d', floatfmt='g', max_width=100)
table_results = pgcli.format_output('Title', [('abc', 'def')],
['head1', 'head2'], 'test status', settings)
table_results = format_output('Title', [('abc', 'def')],
['head1', 'head2'], 'test status', settings)
table = ['Title', '+---------+---------+\n| head1 | head2 |\n|---------+---------|\n| abc | def |\n+---------+---------+', 'test status']
assert table_results == table
expanded_results = pgcli.format_output('Title', [('abc', 'def')],
['head1', 'head2'], 'test status', settings._replace(max_width=1))
expanded_results = format_output('Title', [('abc', 'def')],
['head1', 'head2'], 'test status', settings._replace(max_width=1))
expanded = [
'Title', u'-[ RECORD 1 ]-------------------------\nhead1 | abc\nhead2 | def\n', 'test status']
assert expanded_results == expanded

View File

@ -1,7 +1,7 @@
import pytest
import psycopg2
import psycopg2.extras
from pgcli.main import PGCli, OutputSettings
from pgcli.main import format_output, OutputSettings
from pgcli.pgexecute import register_json_typecasters
# TODO: should this be somehow be divined from environment?
@ -62,14 +62,12 @@ def run(executor, sql, join=False, expanded=False, pgspecial=None,
exception_formatter=None):
" Return string output for the sql to be run "
pgcli = PGCli()
results = executor.run(sql, pgspecial, exception_formatter)
formatted = []
settings = OutputSettings(table_format='psql', dcmlfmt='d', floatfmt='g',
expanded=expanded)
for title, rows, headers, status, sql, success in results:
formatted.extend(pgcli.format_output(
title, rows, headers, status, settings))
formatted.extend(format_output(title, rows, headers, status, settings))
if join:
formatted = '\n'.join(formatted)