Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Apply row limit transform to query in backend #1461

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions querybook/server/datasources/query_transform.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
from app.datasource import register
from lib.query_analysis.transform import (
has_query_contains_unlimited_select,
transform_to_limited_query,
transform_to_sampled_query,
)


@register("/query/transform/limited/", methods=["POST"])
def query_limited(
query: str,
row_limit: int,
language: str,
):
limited_query = transform_to_limited_query(
query=query, limit=row_limit, language=language
)

unlimited_select = has_query_contains_unlimited_select(
query=limited_query, language=language
)

return {"query": limited_query, "unlimited_select": unlimited_select}


@register("/query/transform/sampling/", methods=["POST"])
def query_sampling(
query: str,
Expand All @@ -13,3 +32,25 @@ def query_sampling(
return transform_to_sampled_query(
query=query, language=language, sampling_tables=sampling_tables
)


@register("/query/transform/", methods=["POST"])
def query_transform(
query: str,
language: str,
row_limit: int,
sampling_tables: dict[str, dict[str, str]],
):
sampled_query = transform_to_sampled_query(
query=query, language=language, sampling_tables=sampling_tables
)

limited_query = transform_to_limited_query(
query=sampled_query, limit=row_limit, language=language
)

unlimited_select = has_query_contains_unlimited_select(
query=limited_query, language=language
)

return {"query": limited_query, "unlimited_select": unlimited_select}
23 changes: 18 additions & 5 deletions querybook/server/lib/query_analysis/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,34 @@ def get_limited_select_statement(statement_ast: exp.Expression, limit: int):
return statement_ast.limit(limit)


def has_query_contains_unlimited_select(query: str, language: str) -> bool:
def has_query_contains_unlimited_select(query: str, language: str = None):
"""Check if a query contains a select statement without a limit.
Args:
query: The query to check
Returns:
bool: True if the query contains a select statement without a limit, False otherwise
str: The first select statement without a limit. None if all select statements have a limit.
"""
statements = parse(query, dialect=_get_sqlglot_dialect[language])
return any(get_select_statement_limit(s) == -1 for s in statements)
dialect = _get_sqlglot_dialect(language)
statements = parse(query, dialect)
return next(
(
s.sql(dialect=dialect, pretty=True)
for s in statements
if get_select_statement_limit(s) == -1
),
None,
)


def transform_to_limited_query(
query: str, limit: int = None, language: str = None
) -> str:
"""Apply a limit to all select statements in a query if they don't already have a limit.
It returns a new query with the limit applied and the original query is not modified.

If limit is None or negative, the query is returned as-is.
"""
if not limit:
if not limit or limit < 0:
return query

try:
Expand Down Expand Up @@ -153,6 +163,9 @@ def transform_to_sampled_query(
Returns:
str: The sampled query
"""
if not sampling_tables:
return query

try:
dialect = _get_sqlglot_dialect(language)
statements = parse(query, dialect=dialect)
Expand Down
12 changes: 12 additions & 0 deletions querybook/server/logic/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ def get_admin_announcements(session=None):
)


@with_session
def get_engine_feature_param(
engine_id, feature_param_name, default_value=None, session=None
):
query_engine = get_query_engine_by_id(engine_id, session=session)
return (
query_engine.get_feature_params().get(feature_param_name, default_value)
if query_engine
else default_value
)


"""
---------------------------------------------------------------------------------------------------------
QUERY METASTORE ?
Expand Down
11 changes: 11 additions & 0 deletions querybook/server/tasks/run_datadoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@

from lib.logger import get_logger
from lib.query_analysis.templating import render_templated_query
from lib.query_analysis.transform import transform_to_limited_query
from lib.scheduled_datadoc.export import export_datadoc
from lib.scheduled_datadoc.legacy import convert_if_legacy_datadoc_schedule
from lib.scheduled_datadoc.notification import notifiy_on_datadoc_complete

from logic import admin as admin_logic
from logic import datadoc as datadoc_logic
from logic import query_execution as qe_logic
from logic.schedule import (
Expand Down Expand Up @@ -73,6 +75,7 @@ def run_datadoc_with_config(
# Prepping chain jobs each unit is a [make_qe_task, run_query_task] combo
for index, query_cell in enumerate(query_cells):
engine_id = query_cell.meta["engine"]
limit = query_cell.meta.get("limit", -1)
raw_query = query_cell.context

# Skip empty cells
Expand All @@ -86,6 +89,14 @@ def run_datadoc_with_config(
engine_id,
session=session,
)

# If meta["limit"] is set and > 0, apply limit to the query
row_limit_enabled = admin_logic.get_engine_feature_param(
engine_id, "row_limit", False, session=session
)
if row_limit_enabled and limit >= 0:
query = transform_to_limited_query(query, limit)

except Exception as e:
on_datadoc_completion(
is_success=False,
Expand Down
45 changes: 45 additions & 0 deletions querybook/tests/test_lib/test_query_analysis/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from lib.query_analysis.transform import (
format_query,
get_select_statement_limit,
has_query_contains_unlimited_select,
transform_to_limited_query,
transform_to_sampled_query,
)
Expand Down Expand Up @@ -46,6 +47,50 @@ def test_select_with_limit(self):
self.assertEqual(get_select_statement_limit(query), expected)


class HasQueryContainsUnlimitedSelectTestCase(TestCase):
def test_select_limit(self):
tests = [
"SELECT 1 LIMIT 10",
"SELECT * FROM table_1 WHERE field = 1 LIMIT 10",
"TRUNCATE TABLE table_1; SELECT * FROM table_1 WHERE field = 1 LIMIT 1000",
"SELECT * FROM table_1 WHERE field = 1 LIMIT 10; SELECT * FROM table_2 WHERE field = 1 LIMIT 1000",
]
for query in tests:
with self.subTest(query=query):
self.assertIsNone(has_query_contains_unlimited_select(query))

def test_select_no_limit(self):
tests = [
("SELECT 1", "SELECT\n 1"),
(
"SELECT * FROM table_1 WHERE field = 1",
"SELECT\n *\nFROM table_1\nWHERE\n field = 1",
),
("SELECT 1; SELECT 2", "SELECT\n 1"),
(
"SELECT * FROM table_1 WHERE field = 1 LIMIT 10; SELECT * FROM table_1 WHERE field = 1",
"SELECT\n *\nFROM table_1\nWHERE\n field = 1",
),
]
for query, expected in tests:
with self.subTest(query=query):
self.assertEquals(has_query_contains_unlimited_select(query), expected)

def test_not_select_statements(self):
tests = [
"DELETE FROM table_1 WHERE field = 1",
"CREATE DATABASE IF NOT EXISTS db_1",
"CREATE TABLE table_1 (field1 INT)",
"TRUNCATE TABLE table_1",
"DROP TABLE IF EXISTS db.table1; CREATE TABLE db.table1",
"INSERT INTO table_1 (field1) VALUES (1)",
"UPDATE table_1 SET field1 = 1 WHERE field = 1",
]
for query in tests:
with self.subTest(query=query):
self.assertIsNone(has_query_contains_unlimited_select(query))


class GetLimitedQueryTestCase(TestCase):
def test_limit_is_not_specified(self):
tests = [
Expand Down
128 changes: 1 addition & 127 deletions querybook/webapp/__tests__/lib/sql-helper/sql-limiter.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import {
getLimitedQuery,
getSelectStatementLimit,
} from 'lib/sql-helper/sql-limiter';
import { getSelectStatementLimit } from 'lib/sql-helper/sql-limiter';

describe('getSelectStatementLimit', () => {
describe('when it is not a SELECT statement', () => {
Expand Down Expand Up @@ -76,126 +73,3 @@ describe('getSelectStatementLimit', () => {
});
});
});

describe('getLimitedQuery', () => {
describe('not limited', () => {
test('when rowLimit is not specified', () => {
const query = 'SELECT * FROM table_1 WHERE field = 1;';
expect(getLimitedQuery(query)).toBe(query);
});
test('when rowLimit is not specified for multiple queries', () => {
const query = `
SELECT * FROM table_1 WHERE field = 1;
SELECT * FROM table_1 WHERE field = 1;
`;
expect(getLimitedQuery(query)).toBe(query);
});
test('when running a select query with fetch', () => {
const query =
'SELECT * FROM table_1 ORDER BY id FETCH FIRST 10 ROWS ONLY;';
expect(getLimitedQuery(query, 100, 'trino')).toBe(query);
});
test('when running a select query with offset and fetch', () => {
const query =
'SELECT * FROM table_1 ORDER BY id OFFSET 10 FETCH NEXT 10 ROWS ONLY;';
expect(getLimitedQuery(query, 100, 'trino')).toBe(query);
});
test('when running a select query with nested query', () => {
const query = `select * from (select * from table limit 5) as x limit 10`;
expect(getLimitedQuery(query, 100, 'trino')).toBe(query);
});
test('when running a select query with a where clause and a limit', () => {
const query = 'SELECT * FROM table_1 WHERE field = 1 LIMIT 1000;';
expect(getLimitedQuery(query, 100, 'trino')).toBe(query);
});
});
describe('limited', () => {
test('when running a select query', () => {
const query = 'SELECT * FROM table_1';
expect(getLimitedQuery(query, 10)).toBe(`${query} limit 10;`);
});
test('when running a select query with a where clause and a group by and an order by', () => {
const query =
'SELECT field, count(*) FROM table_1 WHERE deleted = false GROUP BY field ORDER BY field';
expect(getLimitedQuery(query, 100)).toBe(`${query} limit 100;`);
});
test('when running a select query with trailing semicolon', () => {
const query = 'SELECT * FROM table_1;';
expect(getLimitedQuery(query, 10)).toBe(
'SELECT * FROM table_1 limit 10;'
);
});
test('when running a select query with comments', () => {
const query = 'SELECT * FROM table_1 -- limit here';
expect(getLimitedQuery(query, 10)).toBe(
'SELECT * FROM table_1 limit 10;'
);
});
test('when running a select query with non-keyword limits', () => {
const query = `SELECT id, account, 'limit' FROM querybook2.limit ORDER by 'limit' ASC`;
expect(getLimitedQuery(query, 10)).toBe(`${query} limit 10;`);
});
test('when running a multiple select queries', () => {
const query = `SELECT * FROM table_1;
SELECT col1, col2, FROM table2;`;
expect(getLimitedQuery(query, 10)).toBe(
`SELECT * FROM table_1 limit 10;
SELECT col1, col2, FROM table2 limit 10;`
);
});
test('when running a select query with a where clause', () => {
const query = 'SELECT * FROM table_1 WHERE field = 1';
expect(getLimitedQuery(query, 100)).toBe(`${query} limit 100;`);
});
test('when running a select query with a where clause and an order by', () => {
const query =
'SELECT * FROM table_1 WHERE field = 1 ORDER BY field';
expect(getLimitedQuery(query, 100)).toBe(`${query} limit 100;`);
});
test('when running a select query with a where clause and a group by and an order by', () => {
const query =
'SELECT field, count(*) FROM table_1 WHERE deleted = false GROUP BY field ORDER BY field';
expect(getLimitedQuery(query, 100)).toBe(`${query} limit 100;`);
});
test('when running two select queries with mixed limits', () => {
const query = `SELECT * FROM table_1;
SELECT col1, col2, FROM table2 LIMIT 300;`;
expect(getLimitedQuery(query, 10))
.toBe(`SELECT * FROM table_1 limit 10;
SELECT col1, col2, FROM table2 LIMIT 300;`);
});
test('when running multiple select queries with mixed limits', () => {
const query = `SELECT * FROM table_1;
SELECT col1, col2, FROM table2 LIMIT 300;
SELECT field, count(1) FROM table3 GROUP BY field`;
expect(getLimitedQuery(query, 10))
.toBe(`SELECT * FROM table_1 limit 10;
SELECT col1, col2, FROM table2 LIMIT 300;
SELECT field, count(1) FROM table3 GROUP BY field limit 10;`);
});
test('when running a select query with nested query', () => {
const query = `select * from (select * from table limit 5) as x`;
expect(getLimitedQuery(query, 100)).toBe(`${query} limit 100;`);
});
test('when running a select query with wrapped where', () => {
const query = `select * from table where (field = 1 and field2 = 2)`;
expect(getLimitedQuery(query, 100)).toBe(`${query} limit 100;`);
});
test('when running a select query with two nested queries', () => {
const query = `select * from (select * from table limit 5) as x outer join (select * from table2 limit 5) as y on x.id = y.id`;
expect(getLimitedQuery(query, 100)).toBe(`${query} limit 100;`);
});
test('when running a select query with two nested queries', () => {
const query = `select * from (select * from table limit 5) as x outer join (select * from table2 limit 5) as y on x.id = y.id`;
expect(getLimitedQuery(query, 100)).toBe(`${query} limit 100;`);
});
test('when running a select query with two union queries', () => {
const query = `select id, name from table_a union all select id, name from table_b where (deleted = false and active = true)`;
expect(getLimitedQuery(query, 100)).toBe(`${query} limit 100;`);
});
test('when running a select query with two nested union queries', () => {
const query = `(select id, name from table_a limit 10) union all (select id, name from table_b where (deleted = false and active = true))`;
expect(getLimitedQuery(query, 100)).toBe(`${query} limit 100;`);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import { useTrackView } from 'hooks/useTrackView';
import { trackClick } from 'lib/analytics';
import { createSQLLinter } from 'lib/codemirror/codemirror-lint';
import { replaceStringIndices, searchText } from 'lib/data-doc/search';
import { getSelectedQuery, IRange, TableToken } from 'lib/sql-helper/sql-lexer';
import { getSelectedQuery, IRange } from 'lib/sql-helper/sql-lexer';
import { DEFAULT_ROW_LIMIT } from 'lib/sql-helper/sql-limiter';
import { getPossibleTranspilers } from 'lib/templated-query/transpile';
import { enableResizable, getQueryEngineId, sleep } from 'lib/utils';
Expand Down
Loading
Loading