Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OPFS support #1490

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
/.emscripten_cache
.DS_Store
compile_commands.json
*.map

/target

Expand Down
14 changes: 14 additions & 0 deletions examples/esbuild-browser/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,23 @@ import * as arrow from 'apache-arrow';
const db = new duckdb.AsyncDuckDB(logger, worker);
await db.instantiate(DUCKDB_CONFIG.mainModule, DUCKDB_CONFIG.pthreadWorker);

// in-memory
const conn = await db.connect();
await conn.query<{ v: arrow.Int }>(`SELECT count(*)::INTEGER as v FROM generate_series(0, 100) t(v)`);

// opfs
// const opfsRoot = await navigator.storage.getDirectory();
// await opfsRoot.removeEntry('test.db').catch(e => {});
// await db.open({
// path: 'opfs://test.db',
// accessMode: duckdb.DuckDBAccessMode.READ_WRITE,
// });
// const conn = await db.connect();
// await conn.send(`CREATE TABLE integers(i INTEGER, j INTEGER);`);
// await conn.send(`INSERT INTO integers VALUES (3, 4), (5, 6);`);
// await conn.send(`CHECKPOINT;`);
// console.log(await conn.query(`SELECT * FROM integers;`));

await conn.close();
await db.terminate();
await worker.terminate();
Expand Down
2 changes: 2 additions & 0 deletions lib/include/duckdb/web/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ struct WebDBConfig {
std::optional<int8_t> access_mode = std::nullopt;
/// The thread count
uint32_t maximum_threads = 1;
/// The direct io flag
bool use_direct_io = false;
/// The query config
QueryConfig query = {
.cast_bigint_to_double = std::nullopt,
Expand Down
3 changes: 3 additions & 0 deletions lib/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ WebDBConfig WebDBConfig::ReadFrom(std::string_view args_json) {
if (doc.HasMember("maximumThreads") && doc["maximumThreads"].IsNumber()) {
config.maximum_threads = doc["maximumThreads"].GetInt();
}
if (doc.HasMember("useDirectIO") && doc["useDirectIO"].IsBool()) {
config.use_direct_io = doc["useDirectIO"].GetBool();
}
if (doc.HasMember("query") && doc["query"].IsObject()) {
auto q = doc["query"].GetObject();
if (q.HasMember("queryPollingInterval") && q["queryPollingInterval"].IsNumber()) {
Expand Down
4 changes: 3 additions & 1 deletion lib/src/io/web_filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ WebFileSystem::DataProtocol WebFileSystem::inferDataProtocol(std::string_view ur
proto = WebFileSystem::DataProtocol::HTTP;
} else if (hasPrefix(url, "s3://")) {
proto = WebFileSystem::DataProtocol::S3;
} else if (hasPrefix(url, "opfs://")) {
proto = WebFileSystem::DataProtocol::BROWSER_FSACCESS;
} else if (hasPrefix(url, "file://")) {
data_url = std::string_view{url}.substr(7);
proto = default_data_protocol_;
Expand Down Expand Up @@ -778,7 +780,7 @@ void WebFileSystem::Write(duckdb::FileHandle &handle, void *buffer, int64_t nr_b
auto file_size = file_hdl.file_->file_size_;
auto writer = static_cast<char *>(buffer);
file_hdl.position_ = location;
while (nr_bytes > 0 && location < file_size) {
while (nr_bytes > 0) {
auto n = Write(handle, writer, nr_bytes);
writer += n;
nr_bytes -= n;
Expand Down
1 change: 1 addition & 0 deletions lib/src/webdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ arrow::Status WebDB::Open(std::string_view args_json) {
db_config.options.maximum_threads = config_->maximum_threads;
db_config.options.use_temporary_directory = false;
db_config.options.access_mode = access_mode;
db_config.options.use_direct_io = config_->use_direct_io;
auto db = std::make_shared<duckdb::DuckDB>(config_->path, &db_config);
#ifndef WASM_LOADABLE_EXTENSIONS
duckdb_web_parquet_init(db.get());
Expand Down
26 changes: 24 additions & 2 deletions packages/duckdb-wasm/src/bindings/bindings_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,32 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
}
dropResponseBuffers(this.mod);
}
/** Prepare a file handle that could only be acquired aschronously */
public async prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && this._runtime.prepareDBFileHandle) {
const list = await this._runtime.prepareDBFileHandle(path, DuckDBDataProtocol.BROWSER_FSACCESS);
for (const item of list) {
const { handle, path: filePath, fromCached } = item;
if (!fromCached && handle.getSize()) {
await this.registerFileHandle(filePath, handle, DuckDBDataProtocol.BROWSER_FSACCESS, true);
}
}
return;
}
throw new Error(`prepareDBFileHandle: unsupported protocol ${protocol}`);
}
/** Register a file object URL */
public registerFileHandle<HandleType>(
public async registerFileHandle<HandleType>(
name: string,
handle: HandleType,
protocol: DuckDBDataProtocol,
directIO: boolean,
): void {
): Promise<void> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && handle instanceof FileSystemFileHandle) {
// handle is an async handle, should convert to sync handle
const fileHandle: FileSystemFileHandle = handle as any;
handle = (await fileHandle.createSyncAccessHandle()) as any;
}
const [s, d, n] = callSRet(
this.mod,
'duckdb_web_fs_register_file_url',
Expand All @@ -462,6 +481,9 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
}
dropResponseBuffers(this.mod);
globalThis.DUCKDB_RUNTIME._files = (globalThis.DUCKDB_RUNTIME._files || new Map()).set(name, handle);
if (globalThis.DUCKDB_RUNTIME._preparedHandles?.[name]) {
delete globalThis.DUCKDB_RUNTIME._preparedHandles[name];
}
if (this.pthread) {
for (const worker of this.pthread.runningWorkers) {
worker.postMessage({
Expand Down
3 changes: 2 additions & 1 deletion packages/duckdb-wasm/src/bindings/bindings_interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ export interface DuckDBBindings {
handle: HandleType,
protocol: DuckDBDataProtocol,
directIO: boolean,
): void;
): Promise<void>;
prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void>;
globFiles(path: string): WebFile[];
dropFile(name: string): void;
dropFiles(): void;
Expand Down
4 changes: 4 additions & 0 deletions packages/duckdb-wasm/src/bindings/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ export interface DuckDBConfig {
* Note that this will only work with cross-origin isolated sites since it requires SharedArrayBuffers.
*/
maximumThreads?: number;
/**
* The direct io flag
*/
useDirectIO?: boolean;
/**
* The query config
*/
Expand Down
8 changes: 8 additions & 0 deletions packages/duckdb-wasm/src/bindings/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ export interface DuckDBGlobalFileInfo {
s3Config?: S3Config;
}

export interface PreparedDBFileHandle {
path: string;
handle: any;
fromCached: boolean;
}

/** Call a function with packed response buffer */
export function callSRet(
mod: DuckDBModule,
Expand Down Expand Up @@ -147,6 +153,8 @@ export interface DuckDBRuntime {
checkFile(mod: DuckDBModule, pathPtr: number, pathLen: number): boolean;
removeFile(mod: DuckDBModule, pathPtr: number, pathLen: number): void;

prepareDBFileHandle?: (path: string, protocol: DuckDBDataProtocol) => Promise<PreparedDBFileHandle[]>;

// Call a scalar UDF function
callScalarUDF(
mod: DuckDBModule,
Expand Down
115 changes: 106 additions & 9 deletions packages/duckdb-wasm/src/bindings/runtime_browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ import {
failWith,
FileFlags,
readString,
PreparedDBFileHandle,
} from './runtime';
import { DuckDBModule } from './duckdb_module';
import * as udf from './udf_runtime';

const OPFS_PREFIX_LEN = 'opfs://'.length;
const PATH_SEP_REGEX = /\/|\\/;

export const BROWSER_RUNTIME: DuckDBRuntime & {
_files: Map<string, any>;
_fileInfoCache: Map<number, DuckDBFileInfo>;
_globalFileInfo: DuckDBGlobalFileInfo | null;
_preparedHandles: Record<string, any>;

getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null;
getGlobalFileInfo(mod: DuckDBModule): DuckDBGlobalFileInfo | null;
Expand All @@ -26,6 +32,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
_fileInfoCache: new Map<number, DuckDBFileInfo>(),
_udfFunctions: new Map(),
_globalFileInfo: null,
_preparedHandles: {} as any,

getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null {
try {
Expand All @@ -50,6 +57,10 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
const file = { ...info, blob: null } as DuckDBFileInfo;
BROWSER_RUNTIME._fileInfoCache.set(fileId, file);
if (!BROWSER_RUNTIME._files.has(file.fileName) && BROWSER_RUNTIME._preparedHandles[file.fileName]) {
BROWSER_RUNTIME._files.set(file.fileName, BROWSER_RUNTIME._preparedHandles[file.fileName]);
delete BROWSER_RUNTIME._preparedHandles[file.fileName];
}
return file;
} catch (e: any) {
console.log(e);
Expand Down Expand Up @@ -86,6 +97,59 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
},

/** Prepare a file handle that could only be acquired aschronously */
async prepareDBFileHandle(dbPath: string, protocol: DuckDBDataProtocol): Promise<PreparedDBFileHandle[]> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS) {
const filePaths = [dbPath, `${dbPath}.wal`];
const prepare = async (path: string): Promise<PreparedDBFileHandle> => {
if (BROWSER_RUNTIME._files.has(path)) {
return {
path,
handle: BROWSER_RUNTIME._files.get(path),
fromCached: true,
};
}
const opfsRoot = await navigator.storage.getDirectory();
let dirHandle: FileSystemDirectoryHandle = opfsRoot;
// check if mkdir -p is needed
const opfsPath = path.slice(OPFS_PREFIX_LEN);
let fileName = opfsPath;
if (PATH_SEP_REGEX.test(opfsPath)) {
const folders = opfsPath.split(PATH_SEP_REGEX);
fileName = folders.pop()!;
if (!fileName) {
throw new Error(`Invalid path ${path}`);
}
// mkdir -p
for (const folder of folders) {
dirHandle = await dirHandle.getDirectoryHandle(folder, { create: true });
}
}
const fileHandle = await dirHandle.getFileHandle(fileName, { create: false }).catch(e => {
if (e?.name === 'NotFoundError') {
console.log(`File ${path} does not exists yet, creating`);
return dirHandle.getFileHandle(fileName, { create: true });
}
throw e;
});
const handle = await fileHandle.createSyncAccessHandle();
BROWSER_RUNTIME._preparedHandles[path] = handle;
return {
path,
handle,
fromCached: false,
};
};
const result: PreparedDBFileHandle[] = [];
for (const filePath of filePaths) {
const res = await prepare(filePath);
result.push(res);
}
return result;
}
throw new Error(`Unsupported protocol ${protocol} for path ${dbPath} with protocol ${protocol}`);
},

testPlatformFeature: (_mod: DuckDBModule, feature: number): boolean => {
switch (feature) {
case 1:
Expand Down Expand Up @@ -182,7 +246,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {

// Try to fallback to full read?
if (file.allowFullHttpReads) {
if ((contentLength !== null) && (+contentLength > 1)) {
if (contentLength !== null && +contentLength > 1) {
// 2. Send a dummy GET range request querying the first byte of the file
// -> good IFF status is 206 and contentLenght2 is 1
// -> otherwise, iff 200 and contentLenght2 == contentLenght
Expand Down Expand Up @@ -264,6 +328,21 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
mod.HEAPF64[(result >> 3) + 1] = buffer;
return result;
}
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
if (flags & FileFlags.FILE_FLAGS_FILE_CREATE_NEW) {
handle.truncate(0);
}
const result = mod._malloc(2 * 8);
const fileSize = handle.getSize();
console.log(`[BROWSER_RUNTIME] opening ${file.fileName} with size ${fileSize}`);
mod.HEAPF64[(result >> 3) + 0] = fileSize;
mod.HEAPF64[(result >> 3) + 1] = 0;
return result;
}
}
} catch (e: any) {
// TODO (samansmink): this path causes the WASM code to hang
Expand Down Expand Up @@ -311,11 +390,19 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
return;
}
const contentLength = xhr2.getResponseHeader('Content-Length');
if (contentLength && (+contentLength > 1)) {
console.warn(`Range request for ${path} did not return a partial response: ${xhr2.status} "${xhr2.statusText}"`);
if (contentLength && +contentLength > 1) {
console.warn(
`Range request for ${path} did not return a partial response: ${xhr2.status} "${xhr2.statusText}"`,
);
}
}
mod.ccall('duckdb_web_fs_glob_add_path', null, ['string'], [path]);
} else {
for (const [filePath] of BROWSER_RUNTIME._files!.entries() || []) {
if (filePath.startsWith(path)) {
mod.ccall('duckdb_web_fs_glob_add_path', null, ['string'], [filePath]);
}
}
}
} catch (e: any) {
console.log(e);
Expand All @@ -340,6 +427,8 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
xhr.send(null);
return xhr.status == 206 || xhr.status == 200;
} else {
return BROWSER_RUNTIME._files.has(path);
}
} catch (e: any) {
console.log(e);
Expand All @@ -361,11 +450,13 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
// XXX Remove from registry
return;
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle = BROWSER_RUNTIME._files?.get(file.fileName);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
return handle.flush();
handle.flush();
handle.close();
BROWSER_RUNTIME._files.delete(file.fileName);
}
}
},
Expand Down Expand Up @@ -429,8 +520,14 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
} else if (xhr.status == 200) {
// TODO: here we are actually throwing away all non-relevant bytes, but this is still better than failing
// proper solution would require notifying duckdb-wasm cache, while we are piggybackign on browser cache
console.warn(`Range request for ${file.dataUrl} did not return a partial response: ${xhr.status} "${xhr.statusText}"`);
const src = new Uint8Array(xhr.response, location, Math.min(xhr.response.byteLength-location, bytes));
console.warn(
`Range request for ${file.dataUrl} did not return a partial response: ${xhr.status} "${xhr.statusText}"`,
);
const src = new Uint8Array(
xhr.response,
location,
Math.min(xhr.response.byteLength - location, bytes),
);
mod.HEAPU8.set(src, buf);
return src.byteLength;
} else {
Expand All @@ -454,7 +551,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
return data.byteLength;
}
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle = BROWSER_RUNTIME._files?.get(file.fileName);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
Expand Down Expand Up @@ -491,7 +588,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
failWith(mod, 'cannot write using the html5 file reader api');
return 0;
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle = BROWSER_RUNTIME._files?.get(file.fileName);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
Expand Down
Loading