Skip to content

Commit

Permalink
add opfs support
Browse files Browse the repository at this point in the history
  • Loading branch information
dengkunli committed Nov 21, 2023
1 parent e91e701 commit 3ca5b16
Show file tree
Hide file tree
Showing 14 changed files with 260 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
/.emscripten_cache
.DS_Store
compile_commands.json
*.map

/target

Expand Down
14 changes: 14 additions & 0 deletions examples/esbuild-browser/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,23 @@ import * as arrow from 'apache-arrow';
const db = new duckdb.AsyncDuckDB(logger, worker);
await db.instantiate(DUCKDB_CONFIG.mainModule, DUCKDB_CONFIG.pthreadWorker);

// in-memory
const conn = await db.connect();
await conn.query<{ v: arrow.Int }>(`SELECT count(*)::INTEGER as v FROM generate_series(0, 100) t(v)`);

// opfs
// const opfsRoot = await navigator.storage.getDirectory();
// await opfsRoot.removeEntry('test.db').catch(e => {});
// await db.open({
// path: 'opfs://test.db',
// accessMode: duckdb.DuckDBAccessMode.READ_WRITE,
// });
// const conn = await db.connect();
// await conn.send(`CREATE TABLE integers(i INTEGER, j INTEGER);`);
// await conn.send(`INSERT INTO integers VALUES (3, 4), (5, 6);`);
// await conn.send(`CHECKPOINT;`);
// console.log(await conn.query(`SELECT * FROM integers;`));

await conn.close();
await db.terminate();
await worker.terminate();
Expand Down
2 changes: 2 additions & 0 deletions lib/include/duckdb/web/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ struct WebDBConfig {
std::optional<int8_t> access_mode = std::nullopt;
/// The thread count
uint32_t maximum_threads = 1;
/// The direct io flag
bool use_direct_io = false;
/// The query config
QueryConfig query = {
.cast_bigint_to_double = std::nullopt,
Expand Down
3 changes: 3 additions & 0 deletions lib/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ WebDBConfig WebDBConfig::ReadFrom(std::string_view args_json) {
if (doc.HasMember("maximumThreads") && doc["maximumThreads"].IsNumber()) {
config.maximum_threads = doc["maximumThreads"].GetInt();
}
if (doc.HasMember("useDirectIO") && doc["useDirectIO"].IsBool()) {
config.use_direct_io = doc["useDirectIO"].GetBool();
}
if (doc.HasMember("query") && doc["query"].IsObject()) {
auto q = doc["query"].GetObject();
if (q.HasMember("queryPollingInterval") && q["queryPollingInterval"].IsNumber()) {
Expand Down
4 changes: 3 additions & 1 deletion lib/src/io/web_filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ WebFileSystem::DataProtocol WebFileSystem::inferDataProtocol(std::string_view ur
proto = WebFileSystem::DataProtocol::HTTP;
} else if (hasPrefix(url, "s3://")) {
proto = WebFileSystem::DataProtocol::S3;
} else if (hasPrefix(url, "opfs://")) {
proto = WebFileSystem::DataProtocol::BROWSER_FSACCESS;
} else if (hasPrefix(url, "file://")) {
data_url = std::string_view{url}.substr(7);
proto = default_data_protocol_;
Expand Down Expand Up @@ -778,7 +780,7 @@ void WebFileSystem::Write(duckdb::FileHandle &handle, void *buffer, int64_t nr_b
auto file_size = file_hdl.file_->file_size_;
auto writer = static_cast<char *>(buffer);
file_hdl.position_ = location;
while (nr_bytes > 0 && location < file_size) {
while (nr_bytes > 0) {
auto n = Write(handle, writer, nr_bytes);
writer += n;
nr_bytes -= n;
Expand Down
1 change: 1 addition & 0 deletions lib/src/webdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ arrow::Status WebDB::Open(std::string_view args_json) {
db_config.options.maximum_threads = config_->maximum_threads;
db_config.options.use_temporary_directory = false;
db_config.options.access_mode = access_mode;
db_config.options.use_direct_io = config_->use_direct_io;
auto db = std::make_shared<duckdb::DuckDB>(config_->path, &db_config);
#ifndef WASM_LOADABLE_EXTENSIONS
duckdb_web_parquet_init(db.get());
Expand Down
26 changes: 24 additions & 2 deletions packages/duckdb-wasm/src/bindings/bindings_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,32 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
}
dropResponseBuffers(this.mod);
}
/** Prepare a file handle that could only be acquired aschronously */
public async prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && this._runtime.prepareDBFileHandle) {
const list = await this._runtime.prepareDBFileHandle(path, DuckDBDataProtocol.BROWSER_FSACCESS);
for (const item of list) {
const { handle, path: filePath, fromCached } = item;
if (!fromCached && handle.getSize()) {
await this.registerFileHandle(filePath, handle, DuckDBDataProtocol.BROWSER_FSACCESS, true);
}
}
return;
}
throw new Error(`prepareDBFileHandle: unsupported protocol ${protocol}`);
}
/** Register a file object URL */
public registerFileHandle<HandleType>(
public async registerFileHandle<HandleType>(
name: string,
handle: HandleType,
protocol: DuckDBDataProtocol,
directIO: boolean,
): void {
): Promise<void> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && handle instanceof FileSystemFileHandle) {
// handle is an async handle, should convert to sync handle
const fileHandle: FileSystemFileHandle = handle as any;
handle = (await fileHandle.createSyncAccessHandle()) as any;
}
const [s, d, n] = callSRet(
this.mod,
'duckdb_web_fs_register_file_url',
Expand All @@ -462,6 +481,9 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
}
dropResponseBuffers(this.mod);
globalThis.DUCKDB_RUNTIME._files = (globalThis.DUCKDB_RUNTIME._files || new Map()).set(name, handle);
if (globalThis.DUCKDB_RUNTIME._preparedHandles?.[name]) {
delete globalThis.DUCKDB_RUNTIME._preparedHandles[name];
}
if (this.pthread) {
for (const worker of this.pthread.runningWorkers) {
worker.postMessage({
Expand Down
3 changes: 2 additions & 1 deletion packages/duckdb-wasm/src/bindings/bindings_interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ export interface DuckDBBindings {
handle: HandleType,
protocol: DuckDBDataProtocol,
directIO: boolean,
): void;
): Promise<void>;
prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void>;
globFiles(path: string): WebFile[];
dropFile(name: string): void;
dropFiles(): void;
Expand Down
4 changes: 4 additions & 0 deletions packages/duckdb-wasm/src/bindings/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ export interface DuckDBConfig {
* Note that this will only work with cross-origin isolated sites since it requires SharedArrayBuffers.
*/
maximumThreads?: number;
/**
* The direct io flag
*/
useDirectIO?: boolean;
/**
* The query config
*/
Expand Down
8 changes: 8 additions & 0 deletions packages/duckdb-wasm/src/bindings/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ export interface DuckDBGlobalFileInfo {
s3Config?: S3Config;
}

export interface PreparedDBFileHandle {
path: string;
handle: any;
fromCached: boolean;
}

/** Call a function with packed response buffer */
export function callSRet(
mod: DuckDBModule,
Expand Down Expand Up @@ -147,6 +153,8 @@ export interface DuckDBRuntime {
checkFile(mod: DuckDBModule, pathPtr: number, pathLen: number): boolean;
removeFile(mod: DuckDBModule, pathPtr: number, pathLen: number): void;

prepareDBFileHandle?: (path: string, protocol: DuckDBDataProtocol) => Promise<PreparedDBFileHandle[]>;

// Call a scalar UDF function
callScalarUDF(
mod: DuckDBModule,
Expand Down
109 changes: 100 additions & 9 deletions packages/duckdb-wasm/src/bindings/runtime_browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ import {
failWith,
FileFlags,
readString,
PreparedDBFileHandle,
} from './runtime';
import { DuckDBModule } from './duckdb_module';
import * as udf from './udf_runtime';

const OPFS_PREFIX_LEN = 'opfs://'.length;
const PATH_SEP_REGEX = /\/|\\/;

export const BROWSER_RUNTIME: DuckDBRuntime & {
_files: Map<string, any>;
_fileInfoCache: Map<number, DuckDBFileInfo>;
_globalFileInfo: DuckDBGlobalFileInfo | null;
_preparedHandles: Record<string, any>;

getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null;
getGlobalFileInfo(mod: DuckDBModule): DuckDBGlobalFileInfo | null;
Expand All @@ -26,6 +32,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
_fileInfoCache: new Map<number, DuckDBFileInfo>(),
_udfFunctions: new Map(),
_globalFileInfo: null,
_preparedHandles: {} as any,

getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null {
try {
Expand All @@ -50,6 +57,10 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
const file = { ...info, blob: null } as DuckDBFileInfo;
BROWSER_RUNTIME._fileInfoCache.set(fileId, file);
if (!BROWSER_RUNTIME._files.has(file.fileName) && BROWSER_RUNTIME._preparedHandles[file.fileName]) {
BROWSER_RUNTIME._files.set(file.fileName, BROWSER_RUNTIME._preparedHandles[file.fileName]);
delete BROWSER_RUNTIME._preparedHandles[file.fileName];
}
return file;
} catch (e: any) {
console.log(e);
Expand Down Expand Up @@ -86,6 +97,59 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
},

/** Prepare a file handle that could only be acquired aschronously */
async prepareDBFileHandle(dbPath: string, protocol: DuckDBDataProtocol): Promise<PreparedDBFileHandle[]> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS) {
const filePaths = [dbPath, `${dbPath}.wal`];
const prepare = async (path: string): Promise<PreparedDBFileHandle> => {
if (BROWSER_RUNTIME._files.has(path)) {
return {
path,
handle: BROWSER_RUNTIME._files.get(path),
fromCached: true,
};
}
const opfsRoot = await navigator.storage.getDirectory();
let dirHandle: FileSystemDirectoryHandle = opfsRoot;
// check if mkdir -p is needed
const opfsPath = path.slice(OPFS_PREFIX_LEN);
let fileName = opfsPath;
if (PATH_SEP_REGEX.test(opfsPath)) {
const folders = opfsPath.split(PATH_SEP_REGEX);
fileName = folders.pop()!;
if (!fileName) {
throw new Error(`Invalid path ${path}`);
}
// mkdir -p
for (const folder of folders) {
dirHandle = await dirHandle.getDirectoryHandle(folder, { create: true });
}
}
const fileHandle = await dirHandle.getFileHandle(fileName, { create: false }).catch(e => {
if (e?.name === 'NotFoundError') {
console.log(`File ${path} does not exists yet, creating`);
return dirHandle.getFileHandle(fileName, { create: true });
}
throw e;
});
const handle = await fileHandle.createSyncAccessHandle();
BROWSER_RUNTIME._preparedHandles[path] = handle;
return {
path,
handle,
fromCached: false,
};
};
const result: PreparedDBFileHandle[] = [];
for (const filePath of filePaths) {
const res = await prepare(filePath);
result.push(res);
}
return result;
}
throw new Error(`Unsupported protocol ${protocol} for path ${dbPath} with protocol ${protocol}`);
},

testPlatformFeature: (_mod: DuckDBModule, feature: number): boolean => {
switch (feature) {
case 1:
Expand Down Expand Up @@ -182,7 +246,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {

// Try to fallback to full read?
if (file.allowFullHttpReads) {
if ((contentLength !== null) && (+contentLength > 1)) {
if (contentLength !== null && +contentLength > 1) {
// 2. Send a dummy GET range request querying the first byte of the file
// -> good IFF status is 206 and contentLenght2 is 1
// -> otherwise, iff 200 and contentLenght2 == contentLenght
Expand Down Expand Up @@ -264,6 +328,21 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
mod.HEAPF64[(result >> 3) + 1] = buffer;
return result;
}
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
if (flags & FileFlags.FILE_FLAGS_FILE_CREATE_NEW) {
handle.truncate(0);
}
const result = mod._malloc(2 * 8);
const fileSize = handle.getSize();
console.log(`[BROWSER_RUNTIME] opening ${file.fileName} with size ${fileSize}`);
mod.HEAPF64[(result >> 3) + 0] = fileSize;
mod.HEAPF64[(result >> 3) + 1] = 0;
return result;
}
}
} catch (e: any) {
// TODO (samansmink): this path causes the WASM code to hang
Expand Down Expand Up @@ -311,8 +390,10 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
return;
}
const contentLength = xhr2.getResponseHeader('Content-Length');
if (contentLength && (+contentLength > 1)) {
console.warn(`Range request for ${path} did not return a partial response: ${xhr2.status} "${xhr2.statusText}"`);
if (contentLength && +contentLength > 1) {
console.warn(
`Range request for ${path} did not return a partial response: ${xhr2.status} "${xhr2.statusText}"`,
);
}
}
mod.ccall('duckdb_web_fs_glob_add_path', null, ['string'], [path]);
Expand Down Expand Up @@ -340,6 +421,8 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
xhr.send(null);
return xhr.status == 206 || xhr.status == 200;
} else {
return BROWSER_RUNTIME._files.has(path);
}
} catch (e: any) {
console.log(e);
Expand All @@ -361,11 +444,13 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
// XXX Remove from registry
return;
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle = BROWSER_RUNTIME._files?.get(file.fileName);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
return handle.flush();
handle.flush();
handle.close();
BROWSER_RUNTIME._files.delete(file.fileName);
}
}
},
Expand Down Expand Up @@ -429,8 +514,14 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
} else if (xhr.status == 200) {
// TODO: here we are actually throwing away all non-relevant bytes, but this is still better than failing
// proper solution would require notifying duckdb-wasm cache, while we are piggybackign on browser cache
console.warn(`Range request for ${file.dataUrl} did not return a partial response: ${xhr.status} "${xhr.statusText}"`);
const src = new Uint8Array(xhr.response, location, Math.min(xhr.response.byteLength-location, bytes));
console.warn(
`Range request for ${file.dataUrl} did not return a partial response: ${xhr.status} "${xhr.statusText}"`,
);
const src = new Uint8Array(
xhr.response,
location,
Math.min(xhr.response.byteLength - location, bytes),
);
mod.HEAPU8.set(src, buf);
return src.byteLength;
} else {
Expand All @@ -454,7 +545,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
return data.byteLength;
}
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle = BROWSER_RUNTIME._files?.get(file.fileName);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
Expand Down Expand Up @@ -491,7 +582,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
failWith(mod, 'cannot write using the html5 file reader api');
return 0;
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle = BROWSER_RUNTIME._files?.get(file.fileName);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
Expand Down
Loading

0 comments on commit 3ca5b16

Please sign in to comment.