1
0
Fork 0
pgcli/tests/test_smart_completion_multi...

758 lines
24 KiB
Python

import itertools
from metadata import (
MetaData,
alias,
name_join,
fk_join,
join,
schema,
table,
function,
wildcard_expansion,
column,
get_result,
result_set,
qual,
no_qual,
parametrize,
)
from utils import completions_to_set
metadata = {
"tables": {
"public": {
"users": ["id", "email", "first_name", "last_name"],
"orders": ["id", "ordered_date", "status", "datestamp"],
"select": ["id", "localtime", "ABC"],
},
"custom": {
"users": ["id", "phone_number"],
"Users": ["userid", "username"],
"products": ["id", "product_name", "price"],
"shipments": ["id", "address", "user_id"],
},
"Custom": {"projects": ["projectid", "name"]},
"blog": {
"entries": ["entryid", "entrytitle", "entrytext"],
"tags": ["tagid", "name"],
"entrytags": ["entryid", "tagid"],
"entacclog": ["entryid", "username", "datestamp"],
},
},
"functions": {
"public": [
["func1", [], [], [], "", False, False, False, False],
["func2", [], [], [], "", False, False, False, False],
],
"custom": [
["func3", [], [], [], "", False, False, False, False],
[
"set_returning_func",
["x"],
["integer"],
["o"],
"integer",
False,
False,
True,
False,
],
],
"Custom": [["func4", [], [], [], "", False, False, False, False]],
"blog": [
[
"extract_entry_symbols",
["_entryid", "symbol"],
["integer", "text"],
["i", "o"],
"",
False,
False,
True,
False,
],
[
"enter_entry",
["_title", "_text", "entryid"],
["text", "text", "integer"],
["i", "i", "o"],
"",
False,
False,
False,
False,
],
],
},
"datatypes": {"public": ["typ1", "typ2"], "custom": ["typ3", "typ4"]},
"foreignkeys": {
"custom": [("public", "users", "id", "custom", "shipments", "user_id")],
"blog": [
("blog", "entries", "entryid", "blog", "entacclog", "entryid"),
("blog", "entries", "entryid", "blog", "entrytags", "entryid"),
("blog", "tags", "tagid", "blog", "entrytags", "tagid"),
],
},
"defaults": {
"public": {
("orders", "id"): "nextval('orders_id_seq'::regclass)",
("orders", "datestamp"): "now()",
("orders", "status"): "'PENDING'::text",
}
},
}
testdata = MetaData(metadata)
cased_schemas = [schema(x) for x in ("public", "blog", "CUSTOM", '"Custom"')]
casing = (
"SELECT",
"Orders",
"User_Emails",
"CUSTOM",
"Func1",
"Entries",
"Tags",
"EntryTags",
"EntAccLog",
"EntryID",
"EntryTitle",
"EntryText",
)
completers = testdata.get_completers(casing)
@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual))
@parametrize("table", ["users", '"users"'])
def test_suggested_column_names_from_shadowed_visible_table(completer, table):
result = get_result(completer, "SELECT FROM " + table, len("SELECT "))
assert completions_to_set(result) == completions_to_set(
testdata.columns_functions_and_keywords("users")
)
@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual))
@parametrize(
"text",
[
"SELECT from custom.users",
"WITH users as (SELECT 1 AS foo) SELECT from custom.users",
],
)
def test_suggested_column_names_from_qualified_shadowed_table(completer, text):
result = get_result(completer, text, position=text.find(" ") + 1)
assert completions_to_set(result) == completions_to_set(
testdata.columns_functions_and_keywords("users", "custom")
)
@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual))
@parametrize("text", ["WITH users as (SELECT 1 AS foo) SELECT from users"])
def test_suggested_column_names_from_cte(completer, text):
result = completions_to_set(get_result(completer, text, text.find(" ") + 1))
assert result == completions_to_set(
[column("foo")] + testdata.functions_and_keywords()
)
@parametrize("completer", completers(casing=False))
@parametrize(
"text",
[
"SELECT * FROM users JOIN custom.shipments ON ",
"""SELECT *
FROM public.users
JOIN custom.shipments ON """,
],
)
def test_suggested_join_conditions(completer, text):
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
[
alias("users"),
alias("shipments"),
name_join("shipments.id = users.id"),
fk_join("shipments.user_id = users.id"),
]
)
@parametrize("completer", completers(filtr=True, casing=False, aliasing=False))
@parametrize(
("query", "tbl"),
itertools.product(
(
"SELECT * FROM public.{0} RIGHT OUTER JOIN ",
"""SELECT *
FROM {0}
JOIN """,
),
("users", '"users"', "Users"),
),
)
def test_suggested_joins(completer, query, tbl):
result = get_result(completer, query.format(tbl))
assert completions_to_set(result) == completions_to_set(
testdata.schemas_and_from_clause_items()
+ [join(f"custom.shipments ON shipments.user_id = {tbl}.id")]
)
@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual))
def test_suggested_column_names_from_schema_qualifed_table(completer):
result = get_result(completer, "SELECT from custom.products", len("SELECT "))
assert completions_to_set(result) == completions_to_set(
testdata.columns_functions_and_keywords("products", "custom")
)
@parametrize(
"text",
[
"INSERT INTO orders(",
"INSERT INTO orders (",
"INSERT INTO public.orders(",
"INSERT INTO public.orders (",
],
)
@parametrize("completer", completers(filtr=True, casing=False))
def test_suggested_columns_with_insert(completer, text):
assert completions_to_set(get_result(completer, text)) == completions_to_set(
testdata.columns("orders")
)
@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual))
def test_suggested_column_names_in_function(completer):
result = get_result(
completer, "SELECT MAX( from custom.products", len("SELECT MAX(")
)
assert completions_to_set(result) == completions_to_set(
testdata.columns_functions_and_keywords("products", "custom")
)
@parametrize("completer", completers(casing=False, aliasing=False))
@parametrize(
"text",
["SELECT * FROM Custom.", "SELECT * FROM custom.", 'SELECT * FROM "custom".'],
)
@parametrize("use_leading_double_quote", [False, True])
def test_suggested_table_names_with_schema_dot(
completer, text, use_leading_double_quote
):
if use_leading_double_quote:
text += '"'
start_position = -1
else:
start_position = 0
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
testdata.from_clause_items("custom", start_position)
)
@parametrize("completer", completers(casing=False, aliasing=False))
@parametrize("text", ['SELECT * FROM "Custom".'])
@parametrize("use_leading_double_quote", [False, True])
def test_suggested_table_names_with_schema_dot2(
completer, text, use_leading_double_quote
):
if use_leading_double_quote:
text += '"'
start_position = -1
else:
start_position = 0
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
testdata.from_clause_items("Custom", start_position)
)
@parametrize("completer", completers(filtr=True, casing=False))
def test_suggested_column_names_with_qualified_alias(completer):
result = get_result(completer, "SELECT p. from custom.products p", len("SELECT p."))
assert completions_to_set(result) == completions_to_set(
testdata.columns("products", "custom")
)
@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual))
def test_suggested_multiple_column_names(completer):
result = get_result(
completer, "SELECT id, from custom.products", len("SELECT id, ")
)
assert completions_to_set(result) == completions_to_set(
testdata.columns_functions_and_keywords("products", "custom")
)
@parametrize("completer", completers(filtr=True, casing=False))
def test_suggested_multiple_column_names_with_alias(completer):
result = get_result(
completer, "SELECT p.id, p. from custom.products p", len("SELECT u.id, u.")
)
assert completions_to_set(result) == completions_to_set(
testdata.columns("products", "custom")
)
@parametrize("completer", completers(filtr=True, casing=False))
@parametrize(
"text",
[
"SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON ",
"SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON JOIN public.orders z ON z.id > y.id",
],
)
def test_suggestions_after_on(completer, text):
position = len(
"SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON "
)
result = get_result(completer, text, position)
assert completions_to_set(result) == completions_to_set(
[
alias("x"),
alias("y"),
name_join("y.price = x.price"),
name_join("y.product_name = x.product_name"),
name_join("y.id = x.id"),
]
)
@parametrize("completer", completers())
def test_suggested_aliases_after_on_right_side(completer):
text = "SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON x.id = "
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set([alias("x"), alias("y")])
@parametrize("completer", completers(filtr=True, casing=False, aliasing=False))
def test_table_names_after_from(completer):
text = "SELECT * FROM "
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
testdata.schemas_and_from_clause_items()
)
@parametrize("completer", completers(filtr=True, casing=False))
def test_schema_qualified_function_name(completer):
text = "SELECT custom.func"
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
[
function("func3()", -len("func")),
function("set_returning_func()", -len("func")),
]
)
@parametrize("completer", completers(filtr=True, casing=False, aliasing=False))
def test_schema_qualified_function_name_after_from(completer):
text = "SELECT * FROM custom.set_r"
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
[
function("set_returning_func()", -len("func")),
]
)
@parametrize("completer", completers(filtr=True, casing=False, aliasing=False))
def test_unqualified_function_name_not_returned(completer):
text = "SELECT * FROM set_r"
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set([])
@parametrize("completer", completers(filtr=True, casing=False, aliasing=False))
def test_unqualified_function_name_in_search_path(completer):
completer.search_path = ["public", "custom"]
text = "SELECT * FROM set_r"
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
[
function("set_returning_func()", -len("func")),
]
)
@parametrize("completer", completers(filtr=True, casing=False))
@parametrize(
"text",
[
"SELECT 1::custom.",
"CREATE TABLE foo (bar custom.",
"CREATE FUNCTION foo (bar INT, baz custom.",
"ALTER TABLE foo ALTER COLUMN bar TYPE custom.",
],
)
def test_schema_qualified_type_name(completer, text):
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(testdata.types("custom"))
@parametrize("completer", completers(filtr=True, casing=False))
def test_suggest_columns_from_aliased_set_returning_function(completer):
result = get_result(
completer, "select f. from custom.set_returning_func() f", len("select f.")
)
assert completions_to_set(result) == completions_to_set(
testdata.columns("set_returning_func", "custom", "functions")
)
@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual))
@parametrize(
"text",
[
"SELECT * FROM custom.set_returning_func()",
"SELECT * FROM Custom.set_returning_func()",
"SELECT * FROM Custom.Set_Returning_Func()",
],
)
def test_wildcard_column_expansion_with_function(completer, text):
position = len("SELECT *")
completions = get_result(completer, text, position)
col_list = "x"
expected = [wildcard_expansion(col_list)]
assert expected == completions
@parametrize("completer", completers(filtr=True, casing=False))
def test_wildcard_column_expansion_with_alias_qualifier(completer):
text = "SELECT p.* FROM custom.products p"
position = len("SELECT p.*")
completions = get_result(completer, text, position)
col_list = "id, p.product_name, p.price"
expected = [wildcard_expansion(col_list)]
assert expected == completions
@parametrize("completer", completers(filtr=True, casing=False))
@parametrize(
"text",
[
"""
SELECT count(1) FROM users;
CREATE FUNCTION foo(custom.products _products) returns custom.shipments
LANGUAGE SQL
AS $foo$
SELECT 1 FROM custom.shipments;
INSERT INTO public.orders(*) values(-1, now(), 'preliminary');
SELECT 2 FROM custom.users;
$foo$;
SELECT count(1) FROM custom.shipments;
""",
"INSERT INTO public.orders(*",
"INSERT INTO public.Orders(*",
"INSERT INTO public.orders (*",
"INSERT INTO public.Orders (*",
"INSERT INTO orders(*",
"INSERT INTO Orders(*",
"INSERT INTO orders (*",
"INSERT INTO Orders (*",
"INSERT INTO public.orders(*)",
"INSERT INTO public.Orders(*)",
"INSERT INTO public.orders (*)",
"INSERT INTO public.Orders (*)",
"INSERT INTO orders(*)",
"INSERT INTO Orders(*)",
"INSERT INTO orders (*)",
"INSERT INTO Orders (*)",
],
)
def test_wildcard_column_expansion_with_insert(completer, text):
position = text.index("*") + 1
completions = get_result(completer, text, position)
expected = [wildcard_expansion("ordered_date, status")]
assert expected == completions
@parametrize("completer", completers(filtr=True, casing=False))
def test_wildcard_column_expansion_with_table_qualifier(completer):
text = 'SELECT "select".* FROM public."select"'
position = len('SELECT "select".*')
completions = get_result(completer, text, position)
col_list = 'id, "select"."localtime", "select"."ABC"'
expected = [wildcard_expansion(col_list)]
assert expected == completions
@parametrize("completer", completers(filtr=True, casing=False, qualify=qual))
def test_wildcard_column_expansion_with_two_tables(completer):
text = 'SELECT * FROM public."select" JOIN custom.users ON true'
position = len("SELECT *")
completions = get_result(completer, text, position)
cols = (
'"select".id, "select"."localtime", "select"."ABC", '
"users.id, users.phone_number"
)
expected = [wildcard_expansion(cols)]
assert completions == expected
@parametrize("completer", completers(filtr=True, casing=False))
def test_wildcard_column_expansion_with_two_tables_and_parent(completer):
text = 'SELECT "select".* FROM public."select" JOIN custom.users u ON true'
position = len('SELECT "select".*')
completions = get_result(completer, text, position)
col_list = 'id, "select"."localtime", "select"."ABC"'
expected = [wildcard_expansion(col_list)]
assert expected == completions
@parametrize("completer", completers(filtr=True, casing=False))
@parametrize(
"text",
[
"SELECT U. FROM custom.Users U",
"SELECT U. FROM custom.USERS U",
"SELECT U. FROM custom.users U",
'SELECT U. FROM "custom".Users U',
'SELECT U. FROM "custom".USERS U',
'SELECT U. FROM "custom".users U',
],
)
def test_suggest_columns_from_unquoted_table(completer, text):
position = len("SELECT U.")
result = get_result(completer, text, position)
assert completions_to_set(result) == completions_to_set(
testdata.columns("users", "custom")
)
@parametrize("completer", completers(filtr=True, casing=False))
@parametrize(
"text", ['SELECT U. FROM custom."Users" U', 'SELECT U. FROM "custom"."Users" U']
)
def test_suggest_columns_from_quoted_table(completer, text):
position = len("SELECT U.")
result = get_result(completer, text, position)
assert completions_to_set(result) == completions_to_set(
testdata.columns("Users", "custom")
)
texts = ["SELECT * FROM ", "SELECT * FROM public.Orders O CROSS JOIN "]
@parametrize("completer", completers(filtr=True, casing=False, aliasing=False))
@parametrize("text", texts)
def test_schema_or_visible_table_completion(completer, text):
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
testdata.schemas_and_from_clause_items()
)
@parametrize("completer", completers(aliasing=True, casing=False, filtr=True))
@parametrize("text", texts)
def test_table_aliases(completer, text):
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
testdata.schemas()
+ [
table("users u"),
table("orders o" if text == "SELECT * FROM " else "orders o2"),
table('"select" s'),
function("func1() f"),
function("func2() f"),
]
)
@parametrize("completer", completers(aliasing=True, casing=True, filtr=True))
@parametrize("text", texts)
def test_aliases_with_casing(completer, text):
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
cased_schemas
+ [
table("users u"),
table("Orders O" if text == "SELECT * FROM " else "Orders O2"),
table('"select" s'),
function("Func1() F"),
function("func2() f"),
]
)
@parametrize("completer", completers(aliasing=False, casing=True, filtr=True))
@parametrize("text", texts)
def test_table_casing(completer, text):
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
cased_schemas
+ [
table("users"),
table("Orders"),
table('"select"'),
function("Func1()"),
function("func2()"),
]
)
@parametrize("completer", completers(aliasing=False, casing=True))
def test_alias_search_without_aliases2(completer):
text = "SELECT * FROM blog.et"
result = get_result(completer, text)
assert result[0] == table("EntryTags", -2)
@parametrize("completer", completers(aliasing=False, casing=True))
def test_alias_search_without_aliases1(completer):
text = "SELECT * FROM blog.e"
result = get_result(completer, text)
assert result[0] == table("Entries", -1)
@parametrize("completer", completers(aliasing=True, casing=True))
def test_alias_search_with_aliases2(completer):
text = "SELECT * FROM blog.et"
result = get_result(completer, text)
assert result[0] == table("EntryTags ET", -2)
@parametrize("completer", completers(aliasing=True, casing=True))
def test_alias_search_with_aliases1(completer):
text = "SELECT * FROM blog.e"
result = get_result(completer, text)
assert result[0] == table("Entries E", -1)
@parametrize("completer", completers(aliasing=True, casing=True))
def test_join_alias_search_with_aliases1(completer):
text = "SELECT * FROM blog.Entries E JOIN blog.e"
result = get_result(completer, text)
assert result[:2] == [
table("Entries E2", -1),
join("EntAccLog EAL ON EAL.EntryID = E.EntryID", -1),
]
@parametrize("completer", completers(aliasing=False, casing=True))
def test_join_alias_search_without_aliases1(completer):
text = "SELECT * FROM blog.Entries JOIN blog.e"
result = get_result(completer, text)
assert result[:2] == [
table("Entries", -1),
join("EntAccLog ON EntAccLog.EntryID = Entries.EntryID", -1),
]
@parametrize("completer", completers(aliasing=True, casing=True))
def test_join_alias_search_with_aliases2(completer):
text = "SELECT * FROM blog.Entries E JOIN blog.et"
result = get_result(completer, text)
assert result[0] == join("EntryTags ET ON ET.EntryID = E.EntryID", -2)
@parametrize("completer", completers(aliasing=False, casing=True))
def test_join_alias_search_without_aliases2(completer):
text = "SELECT * FROM blog.Entries JOIN blog.et"
result = get_result(completer, text)
assert result[0] == join("EntryTags ON EntryTags.EntryID = Entries.EntryID", -2)
@parametrize("completer", completers())
def test_function_alias_search_without_aliases(completer):
text = "SELECT blog.ees"
result = get_result(completer, text)
first = result[0]
assert first.start_position == -3
assert first.text == "extract_entry_symbols()"
assert first.display_text == "extract_entry_symbols(_entryid)"
@parametrize("completer", completers())
def test_function_alias_search_with_aliases(completer):
text = "SELECT blog.ee"
result = get_result(completer, text)
first = result[0]
assert first.start_position == -2
assert first.text == "enter_entry(_title := , _text := )"
assert first.display_text == "enter_entry(_title, _text)"
@parametrize("completer", completers(filtr=True, casing=True, qualify=no_qual))
def test_column_alias_search(completer):
result = get_result(completer, "SELECT et FROM blog.Entries E", len("SELECT et"))
cols = ("EntryText", "EntryTitle", "EntryID")
assert result[:3] == [column(c, -2) for c in cols]
@parametrize("completer", completers(casing=True))
def test_column_alias_search_qualified(completer):
result = get_result(
completer, "SELECT E.ei FROM blog.Entries E", len("SELECT E.ei")
)
cols = ("EntryID", "EntryTitle")
assert result[:3] == [column(c, -2) for c in cols]
@parametrize("completer", completers(casing=False, filtr=False, aliasing=False))
def test_schema_object_order(completer):
result = get_result(completer, "SELECT * FROM u")
assert result[:3] == [
table(t, pos=-1) for t in ("users", 'custom."Users"', "custom.users")
]
@parametrize("completer", completers(casing=False, filtr=False, aliasing=False))
def test_all_schema_objects(completer):
text = "SELECT * FROM "
result = get_result(completer, text)
assert completions_to_set(result) >= completions_to_set(
[table(x) for x in ("orders", '"select"', "custom.shipments")]
+ [function(x + "()") for x in ("func2",)]
)
@parametrize("completer", completers(filtr=False, aliasing=False, casing=True))
def test_all_schema_objects_with_casing(completer):
text = "SELECT * FROM "
result = get_result(completer, text)
assert completions_to_set(result) >= completions_to_set(
[table(x) for x in ("Orders", '"select"', "CUSTOM.shipments")]
+ [function(x + "()") for x in ("func2",)]
)
@parametrize("completer", completers(casing=False, filtr=False, aliasing=True))
def test_all_schema_objects_with_aliases(completer):
text = "SELECT * FROM "
result = get_result(completer, text)
assert completions_to_set(result) >= completions_to_set(
[table(x) for x in ("orders o", '"select" s', "custom.shipments s")]
+ [function(x) for x in ("func2() f",)]
)
@parametrize("completer", completers(casing=False, filtr=False, aliasing=True))
def test_set_schema(completer):
text = "SET SCHEMA "
result = get_result(completer, text)
assert completions_to_set(result) == completions_to_set(
[schema("'blog'"), schema("'Custom'"), schema("'custom'"), schema("'public'")]
)