Skip to content

Commit

Permalink
Merge pull request #111 from Mytherin/issue101
Browse files Browse the repository at this point in the history
Fix #101: more robustly handle sparse row ids
  • Loading branch information
Mytherin authored Sep 4, 2024
2 parents 1b7c244 + 97cd982 commit 3158619
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 33 deletions.
Binary file added data/db/giant_row_id.db
Binary file not shown.
2 changes: 1 addition & 1 deletion src/include/sqlite_db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SQLiteDB {
idx_t RunPragma(string pragma_name);
//! Gets the max row id of a table, returns false if the table does not have a
//! rowid column
bool GetMaxRowId(const string &table_name, idx_t &row_id);
bool GetRowIdInfo(const string &table_name, RowIdInfo &info);
bool ColumnExists(const string &table_name, const string &column_name);
vector<IndexInfo> GetIndexInfo(const string &table_name);

Expand Down
5 changes: 3 additions & 2 deletions src/include/sqlite_scanner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "duckdb.hpp"
#include "sqlite_utils.hpp"

namespace duckdb {
class SQLiteDB;
Expand All @@ -20,10 +21,10 @@ struct SqliteBindData : public TableFunctionData {
vector<string> names;
vector<LogicalType> types;

idx_t max_rowid = 0;
RowIdInfo row_id_info;
bool all_varchar = false;

idx_t rows_per_group = 122880;
optional_idx rows_per_group = 122880;
SQLiteDB *global_db;
};

Expand Down
5 changes: 5 additions & 0 deletions src/include/sqlite_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ class SQLiteUtils {
string ToSQLiteTypeAlias(const LogicalType &input);
};

struct RowIdInfo {
optional_idx min_rowid;
optional_idx max_rowid;
};

} // namespace duckdb
1 change: 0 additions & 1 deletion src/include/storage/sqlite_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ struct SQLiteOpenOptions {
string journal_mode;
};


} // namespace duckdb
19 changes: 13 additions & 6 deletions src/sqlite_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,21 +222,28 @@ bool SQLiteDB::ColumnExists(const string &table_name, const string &column_name)
return false;
}

bool SQLiteDB::GetMaxRowId(const string &table_name, idx_t &max_row_id) {
bool SQLiteDB::GetRowIdInfo(const string &table_name, RowIdInfo &row_id_info) {
SQLiteStatement stmt;
if (!TryPrepare(StringUtil::Format("SELECT MAX(ROWID) FROM \"%s\"", SQLiteUtils::SanitizeIdentifier(table_name)),
if (!TryPrepare(StringUtil::Format("SELECT MIN(ROWID), MAX(ROWID) FROM \"%s\"",
SQLiteUtils::SanitizeIdentifier(table_name)),
stmt)) {
return false;
}
if (!stmt.Step()) {
return false;
}
int64_t val = stmt.GetValue<int64_t>(0);
;
if (val <= 0) {
int64_t min_val = stmt.GetValue<int64_t>(0);
int64_t max_val = stmt.GetValue<int64_t>(1);
if (min_val < 0 || max_val <= min_val) {
return false;
}
max_row_id = idx_t(val);
static constexpr int64_t MAX_ROWS = 20000000000000;
if (max_val - min_val >= MAX_ROWS) {
// too many rows - this cannot be dense enough to be accurate
return false;
}
row_id_info.min_rowid = NumericCast<idx_t>(min_val);
row_id_info.max_rowid = NumericCast<idx_t>(max_val);
return true;
}

Expand Down
57 changes: 48 additions & 9 deletions src/sqlite_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,21 @@ struct SqliteLocalState : public LocalTableFunctionState {
SQLiteStatement stmt;
bool done = false;
vector<column_t> column_ids;
//! The amount of rows we scanned as part of this row group
idx_t scan_count = 1;

~SqliteLocalState() {
}
};

struct SqliteGlobalState : public GlobalTableFunctionState {
SqliteGlobalState(idx_t max_threads) : max_threads(max_threads) {
explicit SqliteGlobalState(idx_t max_threads) : max_threads(max_threads) {
}

mutex lock;
idx_t position = 0;
idx_t max_threads;
idx_t rows_per_group = 0;

idx_t MaxThreads() const override {
return max_threads;
Expand Down Expand Up @@ -72,9 +75,8 @@ static unique_ptr<FunctionData> SqliteBind(ClientContext &context, TableFunction
throw std::runtime_error("no columns for table " + result->table_name);
}

if (!db.GetMaxRowId(result->table_name, result->max_rowid)) {
result->max_rowid = idx_t(-1);
result->rows_per_group = idx_t(-1);
if (!db.GetRowIdInfo(result->table_name, result->row_id_info)) {
result->rows_per_group = optional_idx();
}

result->names = names;
Expand Down Expand Up @@ -106,7 +108,7 @@ static void SqliteInitInternal(ClientContext &context, const SqliteBindData &bin

auto sql =
StringUtil::Format("SELECT %s FROM \"%s\"", col_names, SQLiteUtils::SanitizeIdentifier(bind_data.table_name));
if (bind_data.rows_per_group != idx_t(-1)) {
if (bind_data.rows_per_group.IsValid()) {
// we are scanning a subset of the rows - generate a WHERE clause based on
// the rowid
auto where_clause = StringUtil::Format(" WHERE ROWID BETWEEN %d AND %d", rowid_min, rowid_max);
Expand All @@ -121,7 +123,11 @@ static void SqliteInitInternal(ClientContext &context, const SqliteBindData &bin
static unique_ptr<NodeStatistics> SqliteCardinality(ClientContext &context, const FunctionData *bind_data_p) {
D_ASSERT(bind_data_p);
auto &bind_data = bind_data_p->Cast<SqliteBindData>();
return make_uniq<NodeStatistics>(bind_data.max_rowid);
if (!bind_data.row_id_info.max_rowid.IsValid()) {
return nullptr;
}
auto row_count = bind_data.row_id_info.max_rowid.GetIndex() - bind_data.row_id_info.min_rowid.GetIndex();
return make_uniq<NodeStatistics>(row_count);
}

static idx_t SqliteMaxThreads(ClientContext &context, const FunctionData *bind_data_p) {
Expand All @@ -130,17 +136,41 @@ static idx_t SqliteMaxThreads(ClientContext &context, const FunctionData *bind_d
if (bind_data.global_db) {
return 1;
}
return bind_data.max_rowid / bind_data.rows_per_group;
if (!bind_data.row_id_info.max_rowid.IsValid()) {
return 1;
}
auto row_count = bind_data.row_id_info.max_rowid.GetIndex() - bind_data.row_id_info.min_rowid.GetIndex();
return row_count / bind_data.rows_per_group.GetIndex();
}

static bool SqliteParallelStateNext(ClientContext &context, const SqliteBindData &bind_data, SqliteLocalState &lstate,
SqliteGlobalState &gstate) {
lock_guard<mutex> parallel_lock(gstate.lock);
if (gstate.position < bind_data.max_rowid) {
if (!bind_data.rows_per_group.IsValid()) {
// not doing a parallel scan - scan everything at once
if (gstate.position > 0) {
// already scanned
return false;
}
SqliteInitInternal(context, bind_data, lstate, 0, 0);
gstate.position = static_cast<idx_t>(-1);
lstate.scan_count = 0;
return true;
}
auto max_row_id = bind_data.row_id_info.max_rowid.GetIndex();
if (gstate.position < max_row_id) {
if (lstate.scan_count == 0 && gstate.rows_per_group < max_row_id) {
// we scanned no rows in our previous slice - double the rows per group
gstate.rows_per_group *= 2;
}
if (gstate.rows_per_group == 0) {
throw InternalException("SqliteParallelStateNext - gstate.rows_per_group not set");
}
auto start = gstate.position;
auto end = start + bind_data.rows_per_group - 1;
auto end = MinValue<idx_t>(max_row_id, start + gstate.rows_per_group - 1);
SqliteInitInternal(context, bind_data, lstate, start, end);
gstate.position = end + 1;
lstate.scan_count = 0;
return true;
}
return false;
Expand All @@ -161,8 +191,16 @@ SqliteInitLocalState(ExecutionContext &context, TableFunctionInitInput &input, G

static unique_ptr<GlobalTableFunctionState> SqliteInitGlobalState(ClientContext &context,
TableFunctionInitInput &input) {
auto &bind_data = input.bind_data->Cast<SqliteBindData>();
auto result = make_uniq<SqliteGlobalState>(SqliteMaxThreads(context, input.bind_data.get()));
result->position = 0;
if (bind_data.rows_per_group.IsValid()) {
auto min_row_id = bind_data.row_id_info.min_rowid.GetIndex();
if (min_row_id > 0) {
result->position = min_row_id - 1;
}
result->rows_per_group = bind_data.rows_per_group.GetIndex();
}
return std::move(result);
}

Expand Down Expand Up @@ -191,6 +229,7 @@ static void SqliteScan(ClientContext &context, TableFunctionInput &data, DataChu
output.SetCardinality(out_idx);
break;
}
state.scan_count++;
for (idx_t col_idx = 0; col_idx < output.ColumnCount(); col_idx++) {
auto &out_vec = output.data[col_idx];
auto sqlite_column_type = stmt.GetType(col_idx);
Expand Down
2 changes: 1 addition & 1 deletion src/sqlite_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ static unique_ptr<Catalog> SQLiteAttach(StorageExtensionInfo *storage_info, Clie
AccessMode access_mode) {
SQLiteOpenOptions options;
options.access_mode = access_mode;
for(auto &entry : info.options) {
for (auto &entry : info.options) {
if (StringUtil::CIEquals(entry.first, "busy_timeout")) {
options.busy_timeout = entry.second.GetValue<uint64_t>();
} else if (StringUtil::CIEquals(entry.first, "journal_mode")) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/sqlite_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ SQLiteCatalog::~SQLiteCatalog() {
}

void SQLiteCatalog::Initialize(bool load_builtin) {
CreateSchemaInfo info;
CreateSchemaInfo info;
main_schema = make_uniq<SQLiteSchemaEntry>(*this, info);
}

Expand Down
18 changes: 9 additions & 9 deletions src/storage/sqlite_table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ unique_ptr<BaseStatistics> SQLiteTableEntry::GetStatistics(ClientContext &contex
}

void SQLiteTableEntry::BindUpdateConstraints(Binder &, LogicalGet &, LogicalProjection &, LogicalUpdate &,
ClientContext &) {
ClientContext &) {
}

TableFunction SQLiteTableEntry::GetScanFunction(ClientContext &context, unique_ptr<FunctionData> &bind_data) {
Expand All @@ -34,33 +34,33 @@ TableFunction SQLiteTableEntry::GetScanFunction(ClientContext &context, unique_p
auto &transaction = Transaction::Get(context, catalog).Cast<SQLiteTransaction>();
auto &db = transaction.GetDB();

if (!db.GetMaxRowId(name, result->max_rowid)) {
result->max_rowid = idx_t(-1);
result->rows_per_group = idx_t(-1);
if (!db.GetRowIdInfo(name, result->row_id_info)) {
result->rows_per_group = optional_idx();
}
if (!transaction.IsReadOnly() || sqlite_catalog.InMemory()) {
// for in-memory databases or if we have transaction-local changes we can
// only do a single-threaded scan set up the transaction's connection object
// as the global db
result->global_db = &db;
result->rows_per_group = idx_t(-1);
result->rows_per_group = optional_idx();
}

bind_data = std::move(result);
return SqliteScanFunction();
return static_cast<TableFunction>(SqliteScanFunction());
}

TableStorageInfo SQLiteTableEntry::GetStorageInfo(ClientContext &context) {
auto &transaction = Transaction::Get(context, catalog).Cast<SQLiteTransaction>();
auto &db = transaction.GetDB();
TableStorageInfo result;

idx_t cardinality;
if (!db.GetMaxRowId(name, cardinality)) {
RowIdInfo info;
if (!db.GetRowIdInfo(name, info)) {
// probably
result.cardinality = 10000;
} else {
result.cardinality = info.max_rowid.GetIndex() - info.min_rowid.GetIndex();
}
result.cardinality = cardinality;

result.index_info = db.GetIndexInfo(name);
return result;
Expand Down
1 change: 1 addition & 0 deletions src/storage/sqlite_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ optional_ptr<CatalogEntry> SQLiteTransaction::GetCatalogEntry(const string &entr
case CatalogType::INDEX_ENTRY: {
CreateIndexInfo info;
info.index_name = entry_name;
info.constraint_type = IndexConstraintType::NONE;

string table_name;
string sql;
Expand Down
25 changes: 25 additions & 0 deletions test/sql/storage/attach_giant_row_id.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# name: test/sql/storage/attach_giant_row_id.test
# description:
# group: [sqlite_storage]

require sqlite_scanner

statement ok
ATTACH 'data/db/giant_row_id.db' AS s1 (TYPE SQLITE)

query I
SELECT * FROM s1.big_row_id
----
1797657063271174144

query I
SELECT * FROM s1.sparse_row_id
----
0
1797657063271174144

query I
SELECT * FROM s1.sparse_row_id_smaller
----
0
10000000000000
6 changes: 3 additions & 3 deletions test/sql/storage/attach_schema_functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ index sqlite_autoindex_integers_1 integers 0
table integers integers 0
view v1 v1 0

query IIII
SELECT database_name, table_name, has_primary_key, estimated_size FROM duckdb_tables()
query III
SELECT database_name, table_name, has_primary_key FROM duckdb_tables()
----
s integers true 3
s integers true

statement ok
SELECT * FROM duckdb_schemas()
Expand Down

0 comments on commit 3158619

Please sign in to comment.