Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse schema from table metadata #30

Merged
merged 17 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/Linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
- name: Setup vcpkg
uses: lukka/[email protected]
with:
vcpkgGitCommitId: 501db0f17ef6df184fcdbfbe0f87cde2313b6ab1
vcpkgGitCommitId: a42af01b72c28a8e1d7b48107b33e4f286a55ef6

# Build extension
- name: Build extension
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/MacOS.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
- name: Setup vcpkg
uses: lukka/[email protected]
with:
vcpkgGitCommitId: 501db0f17ef6df184fcdbfbe0f87cde2313b6ab1
vcpkgGitCommitId: a42af01b72c28a8e1d7b48107b33e4f286a55ef6

- name: Build extension
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/Rest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Setup vcpkg
uses: lukka/[email protected]
with:
vcpkgGitCommitId: 501db0f17ef6df184fcdbfbe0f87cde2313b6ab1
vcpkgGitCommitId: a42af01b72c28a8e1d7b48107b33e4f286a55ef6

- name: Build extension
env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/Windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Setup vcpkg
uses: lukka/[email protected]
with:
vcpkgGitCommitId: 501db0f17ef6df184fcdbfbe0f87cde2313b6ab1
vcpkgGitCommitId: a42af01b72c28a8e1d7b48107b33e4f286a55ef6

- uses: actions/setup-python@v2
with:
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
src/iceberg_extension.cpp
src/iceberg_functions.cpp
src/common/utils.cpp
src/common/schema.cpp
src/common/iceberg.cpp
src/iceberg_functions/iceberg_snapshots.cpp
src/iceberg_functions/iceberg_scan.cpp
Expand Down
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ all: release

MKFILE_PATH := $(abspath $(lastword $(MAKEFILE_LIST)))
PROJ_DIR := $(dir $(MKFILE_PATH))
DISABLE_SANITIZER_FLAG ?=

OSX_BUILD_UNIVERSAL_FLAG=
ifneq (${OSX_BUILD_ARCH}, "")
Expand All @@ -13,6 +14,10 @@ ifeq (${STATIC_LIBCPP}, 1)
STATIC_LIBCPP=-DSTATIC_LIBCPP=TRUE
endif

ifeq (${DISABLE_SANITIZER}, 1)
DISABLE_SANITIZER_FLAG=-DENABLE_SANITIZER=FALSE -DENABLE_UBSAN=0
endif

VCPKG_TOOLCHAIN_PATH?=
ifneq ("${VCPKG_TOOLCHAIN_PATH}", "")
TOOLCHAIN_FLAGS:=${TOOLCHAIN_FLAGS} -DVCPKG_MANIFEST_DIR='${PROJ_DIR}' -DVCPKG_BUILD=1 -DCMAKE_TOOLCHAIN_FILE='${VCPKG_TOOLCHAIN_PATH}'
Expand Down Expand Up @@ -45,17 +50,17 @@ clean:
# Main build
debug:
mkdir -p build/debug && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Debug ${BUILD_FLAGS} -S ./duckdb/ -B build/debug && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} ${DISABLE_SANITIZER_FLAG} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Debug ${BUILD_FLAGS} -S ./duckdb/ -B build/debug && \
cmake --build build/debug --config Debug

release:
mkdir -p build/release && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} -S ./duckdb/ -B build/release && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} ${DISABLE_SANITIZER_FLAG} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} -S ./duckdb/ -B build/release && \
cmake --build build/release --config Release

reldebug:
mkdir -p build/release && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=RelWithDebInfo ${BUILD_FLAGS} -S ./duckdb/ -B build/reldebug && \
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} ${DISABLE_SANITIZER_FLAG} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=RelWithDebInfo ${BUILD_FLAGS} -S ./duckdb/ -B build/reldebug && \
cmake --build build/release --config RelWithDebInfo

# Client build
Expand Down
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 1210 files
3 changes: 0 additions & 3 deletions scripts/start-rest-catalog.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ python3 provision.py
UNPARTITIONED_TABLE_PATH=$(curl -s http://127.0.0.1:8181/v1/namespaces/default/tables/table_unpartitioned | jq -r '."metadata-location"')

SQL=$(cat <<-END
INSTALL iceberg;
LOAD iceberg;

SET s3_access_key_id='admin';
SET s3_secret_access_key='password';
SET s3_endpoint='127.0.0.1:9000';
Expand Down
10 changes: 2 additions & 8 deletions scripts/test_data_generator/generate_base_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@
l_commitdate::TIMESTAMPTZ as l_commitdate_timestamp_tz,
l_comment as l_comment_string,
gen_random_uuid()::VARCHAR as uuid,
l_comment::BLOB as l_comment_blob,
{'a': l_shipmode, 'b': l_quantity} as l_shipmode_quantity_struct,
[l_linenumber, l_quantity] as l_linenumber_quantity_list,
map(['linenumber', 'quantity'], [l_linenumber, l_quantity]) as l_linenumber_quantity_map
l_comment::BLOB as l_comment_blob
FROM
lineitem;""");
elif (MODE.lower() == "default"):
Expand All @@ -70,10 +67,7 @@
l_commitdate::TIMESTAMPTZ as l_commitdate_timestamp_tz,
l_comment as l_comment_string,
gen_random_uuid()::UUID as uuid,
l_comment::BLOB as l_comment_blob,
{'a': l_shipmode, 'b': l_quantity} as l_shipmode_quantity_struct,
[l_linenumber, l_quantity] as l_linenumber_quantity_list,
map(['linenumber', 'quantity'], [l_linenumber, l_quantity]) as l_linenumber_quantity_map
l_comment::BLOB as l_comment_blob
FROM
lineitem;""");
else:
Expand Down
5 changes: 1 addition & 4 deletions scripts/test_data_generator/updates_v1/q01.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,5 @@ set l_orderkey_bool=NULL,
l_commitdate_timestamp=NULL,
l_commitdate_timestamp_tz=NULL,
l_comment_string=NULL,
l_comment_blob=NULL,
l_shipmode_quantity_struct=NULL,
l_linenumber_quantity_list=NULL,
l_linenumber_quantity_map=NULL
l_comment_blob=NULL
where l_partkey_int % 2 = 0;
2 changes: 2 additions & 0 deletions scripts/test_data_generator/updates_v1/q06.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE iceberg_catalog.pyspark_iceberg_table
ADD COLUMN schema_evol_added_col_1 INT DEFAULT 42;
3 changes: 3 additions & 0 deletions scripts/test_data_generator/updates_v1/q07.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE iceberg_catalog.pyspark_iceberg_table
SET schema_evol_added_col_1 = l_partkey_int
WHERE l_partkey_int % 5 = 0;
2 changes: 2 additions & 0 deletions scripts/test_data_generator/updates_v1/q08.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE iceberg_catalog.pyspark_iceberg_table
ALTER COLUMN schema_evol_added_col_1 TYPE BIGINT;
5 changes: 1 addition & 4 deletions scripts/test_data_generator/updates_v2/q01.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,5 @@ set l_orderkey_bool=NULL,
l_commitdate_timestamp=NULL,
l_commitdate_timestamp_tz=NULL,
l_comment_string=NULL,
l_comment_blob=NULL,
l_shipmode_quantity_struct=NULL,
l_linenumber_quantity_list=NULL,
l_linenumber_quantity_map=NULL
l_comment_blob=NULL
where l_partkey_int % 2 = 0;
2 changes: 2 additions & 0 deletions scripts/test_data_generator/updates_v2/q06.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE iceberg_catalog.pyspark_iceberg_table
ADD COLUMN schema_evol_added_col_1 INT DEFAULT 42;
3 changes: 3 additions & 0 deletions scripts/test_data_generator/updates_v2/q07.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE iceberg_catalog.pyspark_iceberg_table
SET schema_evol_added_col_1 = l_partkey_int
WHERE l_partkey_int % 5 = 0;
2 changes: 2 additions & 0 deletions scripts/test_data_generator/updates_v2/q08.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE iceberg_catalog.pyspark_iceberg_table
ALTER COLUMN schema_evol_added_col_1 TYPE BIGINT;
77 changes: 55 additions & 22 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "duckdb.hpp"
#include "iceberg_metadata.hpp"
#include "iceberg_utils.hpp"
#include "iceberg_types.hpp"

#include "avro/Compiler.hh"
Expand Down Expand Up @@ -62,7 +63,8 @@ vector<IcebergManifest> IcebergTable::ReadManifestListFile(string path, FileSyst
return ret;
}

vector<IcebergManifestEntry> IcebergTable::ReadManifestEntries(string path, FileSystem &fs, idx_t iceberg_format_version) {
vector<IcebergManifestEntry> IcebergTable::ReadManifestEntries(string path, FileSystem &fs,
samansmink marked this conversation as resolved.
Show resolved Hide resolved
idx_t iceberg_format_version) {
vector<IcebergManifestEntry> ret;

// TODO: make streaming
Expand All @@ -88,49 +90,77 @@ vector<IcebergManifestEntry> IcebergTable::ReadManifestEntries(string path, File
return ret;
}

IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(string &path, FileSystem &fs) {
unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(yyjson_doc *metadata_json) {
SnapshotParseInfo info {};
auto root = yyjson_doc_get_root(metadata_json);
info.iceberg_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
info.snapshots = yyjson_obj_get(root, "snapshots");

// Multiple schemas can be present in the json metadata 'schemas' list
if (yyjson_obj_getn(root, "current-schema-id", string("current-schema-id").size())) {
size_t idx, max;
yyjson_val *schema;
info.schema_id = IcebergUtils::TryGetNumFromObject(root, "current-schema-id");
auto schemas = yyjson_obj_get(root, "schemas");
yyjson_arr_foreach(schemas, idx, max, schema) {
info.schemas.push_back(schema);
}
} else {
auto schema = yyjson_obj_get(root, "schema");
if (!schema) {
throw IOException("Neither a valid schema or schemas field was found");
}
auto found_schema_id = IcebergUtils::TryGetNumFromObject(schema, "schema-id");
info.schemas.push_back(schema);
info.schema_id = found_schema_id;
}

return make_uniq<SnapshotParseInfo>(std::move(info));
}

unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(string &path, FileSystem &fs) {
auto metadata_json = ReadMetaData(path, fs);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto root = yyjson_doc_get_root(doc);
auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
auto snapshots = yyjson_obj_get(root, "snapshots");
auto latest_snapshot = FindLatestSnapshotInternal(snapshots);
auto parse_info = GetParseInfo(doc);

// Transfer string and yyjson doc ownership
parse_info->doc = doc;
parse_info->document = std::move(metadata_json);

return std::move(parse_info);
}

IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(string &path, FileSystem &fs) {
auto info = GetParseInfo(path, fs);
auto latest_snapshot = FindLatestSnapshotInternal(info->snapshots);

if (!latest_snapshot) {
throw IOException("No snapshots found");
}

return ParseSnapShot(latest_snapshot, iceberg_format_version);
return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotById(string &path, FileSystem &fs, idx_t snapshot_id) {
auto metadata_json = ReadMetaData(path, fs);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto root = yyjson_doc_get_root(doc);
auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
auto snapshots = yyjson_obj_get(root, "snapshots");
auto snapshot = FindSnapshotByIdInternal(snapshots, snapshot_id);
auto info = GetParseInfo(path, fs);
auto snapshot = FindSnapshotByIdInternal(info->snapshots, snapshot_id);

if (!snapshot) {
throw IOException("Could not find snapshot with id " + to_string(snapshot_id));
}

return ParseSnapShot(snapshot, iceberg_format_version);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(string &path, FileSystem &fs, timestamp_t timestamp) {
auto metadata_json = ReadMetaData(path, fs);
auto doc = yyjson_read(metadata_json.c_str(), metadata_json.size(), 0);
auto root = yyjson_doc_get_root(doc);
auto iceberg_format_version = IcebergUtils::TryGetNumFromObject(root, "format-version");
auto snapshots = yyjson_obj_get(root, "snapshots");
auto snapshot = FindSnapshotByIdTimestampInternal(snapshots, timestamp);
auto info = GetParseInfo(path, fs);
auto snapshot = FindSnapshotByIdTimestampInternal(info->snapshots, timestamp);

if (!snapshot) {
throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp));
}

return ParseSnapShot(snapshot, iceberg_format_version);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas);
}

string IcebergSnapshot::ReadMetaData(string &path, FileSystem &fs) {
Expand All @@ -147,7 +177,8 @@ string IcebergSnapshot::ReadMetaData(string &path, FileSystem &fs) {
return IcebergUtils::FileToString(metadata_file_path, fs);
}

IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version) {
IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas) {
IcebergSnapshot ret;
auto snapshot_tag = yyjson_get_tag(snapshot);
if (snapshot_tag != YYJSON_TYPE_OBJ) {
Expand All @@ -164,6 +195,8 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe
ret.timestamp_ms = Timestamp::FromEpochMs(IcebergUtils::TryGetNumFromObject(snapshot, "timestamp-ms"));
ret.manifest_list = IcebergUtils::TryGetStrFromObject(snapshot, "manifest-list");
ret.iceberg_format_version = iceberg_format_version;
ret.schema_id = schema_id;
ret.schema = ParseSchema(schemas, ret.schema_id);
samansmink marked this conversation as resolved.
Show resolved Hide resolved

return ret;
}
Expand Down
Loading
Loading