mirror of https://github.com/dbcli/pgcli
Simplistic test
This commit is contained in:
parent
a87f3a1fcc
commit
8307a7e642
|
@ -145,6 +145,7 @@ def cli(database, user, host, port, prompt_passwd, never_prompt):
|
|||
try:
|
||||
_logger.debug('sql: %r', document.text)
|
||||
res = pgexecute.run(document.text)
|
||||
|
||||
output = []
|
||||
for rows, headers, status in res:
|
||||
_logger.debug("headers: %r", headers)
|
||||
|
@ -157,6 +158,7 @@ def cli(database, user, host, port, prompt_passwd, never_prompt):
|
|||
if status: # Only print the status if it's not None.
|
||||
output.append(status)
|
||||
_logger.debug("status: %r", status)
|
||||
output.extend(format_output(rows, headers, status))
|
||||
click.echo_via_pager('\n'.join(output))
|
||||
except Exception as e:
|
||||
_logger.error("sql: %r, error: %r", document.text, e)
|
||||
|
@ -173,6 +175,14 @@ def cli(database, user, host, port, prompt_passwd, never_prompt):
|
|||
_logger.debug('Restoring env var LESS to %r.', original_less_opts)
|
||||
os.environ['LESS'] = original_less_opts
|
||||
|
||||
def format_output(rows, headers, status):
|
||||
output = []
|
||||
if rows:
|
||||
output.append(tabulate(rows, headers, tablefmt='psql'))
|
||||
if status: # Only print the status if it's not None.
|
||||
output.append(status)
|
||||
return '\n'.join(output)
|
||||
|
||||
def need_completion_refresh(sql):
|
||||
try:
|
||||
first_token = sql.split()[0]
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from pgcli.pgexecute import _parse_dsn
|
||||
from textwrap import dedent
|
||||
from utils import *
|
||||
|
||||
def test__parse_dsn():
|
||||
|
@ -40,6 +41,13 @@ def test__parse_dsn():
|
|||
assert _parse_dsn(dsn, 'fuser', 'fpasswd', 'fhost', '1234') == expected
|
||||
|
||||
@dbtest
|
||||
def test_conn(cursor, executor):
|
||||
data = executor.run('''create table test(id integer)''')
|
||||
assert not data
|
||||
def test_conn(executor):
|
||||
run(executor, '''create table test(a text)''')
|
||||
run(executor, '''insert into test values('abc')''')
|
||||
assert run(executor, '''select * from test''') == dedent("""\
|
||||
+-----+
|
||||
| a |
|
||||
|-----|
|
||||
| abc |
|
||||
+-----+
|
||||
SELECT 1""")
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import pytest
|
||||
import psycopg2
|
||||
from pgcli.main import format_output
|
||||
|
||||
# TODO: should this be somehow be divined from environment?
|
||||
POSTGRES_USER, POSTGRES_HOST = 'postgres', 'localhost'
|
||||
|
@ -32,3 +33,10 @@ def create_db(dbname):
|
|||
def drop_tables(conn):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('''DROP SCHEMA public CASCADE; CREATE SCHEMA public''')
|
||||
|
||||
|
||||
def run(executor, sql):
|
||||
" Return string output for the sql to be run "
|
||||
data = executor.run(sql)
|
||||
assert len(data) == 1 # current code does that
|
||||
return format_output(*data[0])
|
||||
|
|
Loading…
Reference in New Issue