Skip to content

fix(opfs): Multi-Window Support for OPFS #1969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ if(EMSCRIPTEN)
_malloc, \
_calloc, \
_free, \
stringToUTF8, \
lengthBytesUTF8, \
stackAlloc, \
_duckdb_web_clear_response, \
_duckdb_web_collect_file_stats, \
_duckdb_web_connect, \
Expand Down
20 changes: 18 additions & 2 deletions lib/src/webdb_api.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <exception>
#include <iostream>
#include <stdexcept>

Expand Down Expand Up @@ -94,9 +95,24 @@ void duckdb_web_fs_drop_file(WASMResponse* packed, const char* file_name) {
WASMResponseBuffer::Get().Store(*packed, webdb.DropFile(file_name));
}
/// Drop a file
void duckdb_web_fs_drop_files(WASMResponse* packed) {
void duckdb_web_fs_drop_files(WASMResponse* packed, const char** names, int name_count) {
GET_WEBDB(*packed);
WASMResponseBuffer::Get().Store(*packed, webdb.DropFiles());
if (name_count == 0) {
WASMResponseBuffer::Get().Store(*packed, webdb.DropFiles());
} else {
for (int i = 0; i < name_count; i++) {
const char* name = names[i];
if (name == nullptr) {
std::cerr << "Error: NULL pointer detected at index " << i << std::endl;
continue;
}
if (std::strlen(name) == 0) {
std::cerr << "Error: Empty string detected at index " << i << std::endl;
continue;
}
WASMResponseBuffer::Get().Store(*packed, webdb.DropFile(name));
}
}
}
/// Glob file infos
void duckdb_web_fs_glob_file_infos(WASMResponse* packed, const char* file_name) {
Expand Down
76 changes: 58 additions & 18 deletions packages/duckdb-wasm/src/bindings/bindings_base.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { DuckDBModule, PThread } from './duckdb_module';
import { DuckDBConfig } from './config';
import { DuckDBAccessMode, DuckDBConfig } from './config';
import { Logger } from '../log';
import { InstantiationProgress } from './progress';
import { DuckDBBindings } from './bindings_interface';
import { DuckDBConnection } from './connection';
import { StatusCode } from '../status';
import { dropResponseBuffers, DuckDBRuntime, readString, callSRet, copyBuffer, DuckDBDataProtocol } from './runtime';
import { CSVInsertOptions, JSONInsertOptions, ArrowInsertOptions } from './insert_options';
import { callSRet, copyBuffer, dropResponseBuffers, DuckDBDataProtocol, DuckDBRuntime, readString } from './runtime';
import { ArrowInsertOptions, CSVInsertOptions, JSONInsertOptions } from './insert_options';
import { ScriptTokens } from './tokens';
import { FileStatistics } from './file_stats';
import { arrowToSQLField, arrowToSQLType } from '../json_typedef';
Expand Down Expand Up @@ -469,9 +469,9 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
}
dropResponseBuffers(this.mod);
}
public async prepareFileHandle(fileName: string, protocol: DuckDBDataProtocol): Promise<void> {
public async prepareFileHandle(fileName: string, protocol: DuckDBDataProtocol, accessMode: DuckDBAccessMode, multiWindowMode:boolean): Promise<void> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && this._runtime.prepareFileHandles) {
const list = await this._runtime.prepareFileHandles([fileName], DuckDBDataProtocol.BROWSER_FSACCESS);
const list = await this._runtime.prepareFileHandles([fileName], DuckDBDataProtocol.BROWSER_FSACCESS, accessMode, multiWindowMode);
for (const item of list) {
const { handle, path: filePath, fromCached } = item;
if (!fromCached && handle.getSize()) {
Expand All @@ -483,9 +483,9 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
throw new Error(`prepareFileHandle: unsupported protocol ${protocol}`);
}
/** Prepare a file handle that could only be acquired aschronously */
public async prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void> {
public async prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol, accessMode: DuckDBAccessMode, multiWindowMode:boolean): Promise<void> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && this._runtime.prepareDBFileHandle) {
const list = await this._runtime.prepareDBFileHandle(path, DuckDBDataProtocol.BROWSER_FSACCESS);
const list = await this._runtime.prepareDBFileHandle(path, DuckDBDataProtocol.BROWSER_FSACCESS, accessMode, multiWindowMode);
for (const item of list) {
const { handle, path: filePath, fromCached } = item;
if (!fromCached && handle.getSize()) {
Expand Down Expand Up @@ -525,7 +525,7 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
}
}
}
return handle;
return handle;
}
/** Register a file object URL async */
public async registerFileHandleAsync<HandleType>(
Expand Down Expand Up @@ -583,12 +583,52 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
dropResponseBuffers(this.mod);
}
/** Drop files */
public dropFiles(): void {
const [s, d, n] = callSRet(this.mod, 'duckdb_web_fs_drop_files', [], []);
if (s !== StatusCode.SUCCESS) {
throw new Error(readString(this.mod, d, n));
public dropFiles(names?:string[]): void {
const pointers:number[] = [];
let pointerOfArray:number = -1;
try {
for (const str of (names ?? [])) {
if (str !== null && str !== undefined && str.length > 0) {
const size = this.mod.lengthBytesUTF8(str) + 1;
const ret = this.mod._malloc(size);
if (!ret) {
throw new Error(`Failed to allocate memory for string: ${str}`);
}
this.mod.stringToUTF8(str, ret, size);
pointers.push(ret);
}
}
pointerOfArray = this.mod._malloc(pointers.length * 4);
if (!pointerOfArray) {
throw new Error(`Failed to allocate memory for pointers array`);
}
for (let i = 0; i < pointers.length; i++) {
this.mod.HEAP32[(pointerOfArray >> 2) + i] = pointers[i];
}
const [s, d, n] = callSRet(
this.mod,
'duckdb_web_fs_drop_files',
[
'number',
'number'
],
[
pointerOfArray,
pointers.length
]
);
if (s !== StatusCode.SUCCESS) {
throw new Error(readString(this.mod, d, n));
}
dropResponseBuffers(this.mod);
} finally {
for (const pointer of pointers) {
this.mod._free(pointer);
}
if( pointerOfArray > 0 ){
this.mod._free(pointerOfArray);
}
}
dropResponseBuffers(this.mod);
}
/** Flush all files */
public flushFiles(): void {
Expand All @@ -615,11 +655,11 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
return copy;
}
/** Enable tracking of file statistics */
public registerOPFSFileName(file: string): Promise<void> {
if (file.startsWith("opfs://")) {
return this.prepareFileHandle(file, DuckDBDataProtocol.BROWSER_FSACCESS);
} else {
throw new Error("Not an OPFS file name: " + file);
public async registerOPFSFileName(file: string, accesssMode:DuckDBAccessMode = DuckDBAccessMode.READ_WRITE, multiWindowMode:boolean = false): Promise<void> {
if (file.startsWith("opfs://")) {
return await this.prepareFileHandle(file, DuckDBDataProtocol.BROWSER_FSACCESS, accesssMode, multiWindowMode);
} else {
throw new Error("Not an OPFS file name: " + file);
}
}
public collectFileStatistics(file: string, enable: boolean): void {
Expand Down
10 changes: 5 additions & 5 deletions packages/duckdb-wasm/src/bindings/bindings_interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DuckDBConfig, DuckDBConnection, DuckDBDataProtocol, FileStatistics, InstantiationProgress } from '.';
import { DuckDBAccessMode, DuckDBConfig, DuckDBConnection, DuckDBDataProtocol, FileStatistics, InstantiationProgress } from '.';
import { CSVInsertOptions, JSONInsertOptions, ArrowInsertOptions } from './insert_options';
import { ScriptTokens } from './tokens';
import { WebFile } from './web_file';
Expand Down Expand Up @@ -54,15 +54,15 @@ export interface DuckDBBindings {
protocol: DuckDBDataProtocol,
directIO: boolean,
): Promise<HandleType>;
prepareFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void>;
prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void>;
prepareFileHandle(path: string, protocol: DuckDBDataProtocol, accessMode: DuckDBAccessMode, multiWindowMode:boolean): Promise<void>;
prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol, accessMode: DuckDBAccessMode, multiWindowMode:boolean): Promise<void>;
globFiles(path: string): WebFile[];
dropFile(name: string): void;
dropFiles(): void;
dropFiles(names?: string[]): void;
flushFiles(): void;
copyFileToPath(name: string, path: string): void;
copyFileToBuffer(name: string): Uint8Array;
registerOPFSFileName(file: string): void;
registerOPFSFileName(file: string, accessMode: DuckDBAccessMode, multiWindowMode:boolean): Promise<void>;
collectFileStatistics(file: string, enable: boolean): void;
exportFileStatistics(file: string): FileStatistics;
}
14 changes: 14 additions & 0 deletions packages/duckdb-wasm/src/bindings/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ export interface DuckDBFilesystemConfig {
allowFullHTTPReads?: boolean;
}

export interface DuckDBOPFSConfig {
/**
* Defines how `opfs://` files are handled during SQL execution.
* - "auto": Automatically register `opfs://` files and drop them after execution.
* - "manual": Files must be manually registered and dropped.
*/
fileHandling?: "auto" | "manual";
window?: "single" | "multi";
}

export enum DuckDBAccessMode {
UNDEFINED = 0,
AUTOMATIC = 1,
Expand Down Expand Up @@ -70,4 +80,8 @@ export interface DuckDBConfig {
* Custom user agent string
*/
customUserAgent?: string;
/**
* opfs string
*/
opfs?: DuckDBOPFSConfig;
}
2 changes: 2 additions & 0 deletions packages/duckdb-wasm/src/bindings/duckdb_module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ export interface DuckDBModule extends EmscriptenModule {
stackSave: typeof stackSave;
stackAlloc: typeof stackAlloc;
stackRestore: typeof stackRestore;
lengthBytesUTF8: typeof lengthBytesUTF8;
stringToUTF8: typeof stringToUTF8;

ccall: typeof ccall;
PThread: PThread;
Expand Down
15 changes: 13 additions & 2 deletions packages/duckdb-wasm/src/bindings/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { DuckDBAccessMode } from './config';
import { DuckDBModule } from './duckdb_module';
import { UDFFunction } from './udf_function';
import * as udf_rt from './udf_runtime';
Expand Down Expand Up @@ -58,6 +59,16 @@ export enum FileFlags {
FILE_FLAGS_FILE_CREATE_NEW = 1 << 4,
//! Open file in append mode
FILE_FLAGS_APPEND = 1 << 5,
//! Open file with restrictive permissions (600 on linux/mac) can only be used when creating, throws if file exists
FILE_FLAGS_PRIVATE = 1 << 6,
//! Return NULL if the file does not exist instead of throwing an error
FILE_FLAGS_NULL_IF_NOT_EXISTS = 1 << 7,
//! Multiple threads may perform reads and writes in parallel
FILE_FLAGS_PARALLEL_ACCESS = 1 << 8,
//! Ensure that this call creates the file, throw is file exists
FILE_FLAGS_EXCLUSIVE_CREATE = 1 << 9,
//! Return NULL if the file exist instead of throwing an error
FILE_FLAGS_NULL_IF_EXISTS = 1 << 10,
}

/** Configuration for the AWS S3 Filesystem */
Expand Down Expand Up @@ -158,8 +169,8 @@ export interface DuckDBRuntime {

// Prepare a file handle that could only be acquired aschronously
prepareFileHandle?: (path: string, protocol: DuckDBDataProtocol) => Promise<PreparedDBFileHandle[]>;
prepareFileHandles?: (path: string[], protocol: DuckDBDataProtocol) => Promise<PreparedDBFileHandle[]>;
prepareDBFileHandle?: (path: string, protocol: DuckDBDataProtocol) => Promise<PreparedDBFileHandle[]>;
prepareFileHandles?: (path: string[], protocol: DuckDBDataProtocol, accessMode: DuckDBAccessMode, multiWindowMode:boolean) => Promise<PreparedDBFileHandle[]>;
prepareDBFileHandle?: (path: string, protocol: DuckDBDataProtocol, accessMode: DuckDBAccessMode, multiWindowMode:boolean) => Promise<PreparedDBFileHandle[]>;

// Call a scalar UDF function
callScalarUDF(
Expand Down
40 changes: 28 additions & 12 deletions packages/duckdb-wasm/src/bindings/runtime_browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
} from './runtime';
import { DuckDBModule } from './duckdb_module';
import * as udf from './udf_runtime';
import { DuckDBAccessMode } from './config';

const OPFS_PREFIX_LEN = 'opfs://'.length;
const PATH_SEP_REGEX = /\/|\\/;
Expand Down Expand Up @@ -110,8 +111,11 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
BROWSER_RUNTIME._opfsRoot = await navigator.storage.getDirectory();
}
},
/** Prepare a file handle that could only be acquired aschronously */
async prepareFileHandles(filePaths: string[], protocol: DuckDBDataProtocol): Promise<PreparedDBFileHandle[]> {
/** Prepare a file handle that could only be acquired asynchronously */
async prepareFileHandles(filePaths: string[], protocol: DuckDBDataProtocol, accessMode?: DuckDBAccessMode): Promise<PreparedDBFileHandle[]> {
// DuckDBAccessMode.UNDEFINED will be treated as READ_WRITE
// See: https://github.com/duckdb/duckdb/blob/5f5512b827df6397afd31daedb4bbdee76520019/src/main/database.cpp#L442-L444
const isReadWrite = !accessMode || accessMode === DuckDBAccessMode.READ_WRITE;
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS) {
await BROWSER_RUNTIME.assignOPFSRoot();
const prepare = async (path: string): Promise<PreparedDBFileHandle> => {
Expand All @@ -135,18 +139,30 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
// mkdir -p
for (const folder of folders) {
dirHandle = await dirHandle.getDirectoryHandle(folder, { create: true });
dirHandle = await dirHandle.getDirectoryHandle(folder, { create: isReadWrite });
}
}
const fileHandle = await dirHandle.getFileHandle(fileName, { create: false }).catch(e => {
let fileHandle:FileSystemFileHandle;
try {
fileHandle = await dirHandle.getFileHandle(fileName, { create: false });
} catch (e: any) {
if (e?.name === 'NotFoundError') {
console.debug(`File ${path} does not exists yet, creating...`);
return dirHandle.getFileHandle(fileName, { create: true });
if (isReadWrite) {
console.debug(`File ${ path } does not exists yet, creating...`);
fileHandle = await dirHandle.getFileHandle(fileName, { create: true });
} else {
console.debug(`File ${ path } does not exists, aborting as we are in read-only mode`);
throw e;
}
} else {
throw e;
}
throw e;
});
}
try {
const handle = await fileHandle.createSyncAccessHandle();
let mode:FileSystemSyncAccessHandleMode = isReadWrite ? "readwrite" : "readwrite-unsafe";
const handle = await fileHandle.createSyncAccessHandle({
mode : mode
});
BROWSER_RUNTIME._preparedHandles[path] = handle;
return {
path,
Expand All @@ -166,11 +182,11 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
throw new Error(`Unsupported protocol ${protocol} for paths ${filePaths} with protocol ${protocol}`);
},
/** Prepare a file handle that could only be acquired aschronously */
async prepareDBFileHandle(dbPath: string, protocol: DuckDBDataProtocol): Promise<PreparedDBFileHandle[]> {
/** Prepare a file handle that could only be acquired asynchronously */
async prepareDBFileHandle(dbPath: string, protocol: DuckDBDataProtocol, accessMode: DuckDBAccessMode, multiWindowMode:boolean): Promise<PreparedDBFileHandle[]> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && this.prepareFileHandles) {
const filePaths = [dbPath, `${dbPath}.wal`];
return this.prepareFileHandles(filePaths, protocol);
return this.prepareFileHandles(filePaths, protocol, accessMode, multiWindowMode);
}
throw new Error(`Unsupported protocol ${protocol} for path ${dbPath} with protocol ${protocol}`);
},
Expand Down
32 changes: 24 additions & 8 deletions packages/duckdb-wasm/src/bindings/runtime_node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,32 @@ export const NODE_RUNTIME: DuckDBRuntime & {
switch (file?.dataProtocol) {
// Native file
case DuckDBDataProtocol.NODE_FS: {
let openFlags = fs.constants.O_RDONLY;
if (flags & FileFlags.FILE_FLAGS_WRITE) {
openFlags = fs.constants.O_RDWR;
}
if (flags & FileFlags.FILE_FLAGS_FILE_CREATE) {
openFlags |= fs.constants.O_CREAT;
} else if (flags & FileFlags.FILE_FLAGS_FILE_CREATE_NEW) {
openFlags |= fs.constants.O_TRUNC;
}
let fd = NODE_RUNTIME._files?.get(file.dataUrl!);
if (fd === null || fd === undefined) {
fd = fs.openSync(
file.dataUrl!,
fs.constants.O_CREAT | fs.constants.O_RDWR,
fs.constants.S_IRUSR | fs.constants.S_IWUSR,
);
NODE_RUNTIME._filesById?.set(file.fileId!, fd);
let fileSize = 0;
try {
if (fd === null || fd === undefined) {
fd = fs.openSync(file.dataUrl!, openFlags, fs.constants.S_IRUSR | fs.constants.S_IWUSR);
NODE_RUNTIME._filesById?.set(file.fileId!, fd);
}
fileSize = fs.fstatSync(fd).size;
}
catch (e: any) {
if (e.code === 'ENOENT' && (flags & FileFlags.FILE_FLAGS_NULL_IF_NOT_EXISTS)) {
// No-op because we intend to ignore ENOENT while the file does not exist
return 0; // nullptr
} else {
throw e;
}
}
const fileSize = fs.fstatSync(fd).size;
const result = mod._malloc(2 * 8);
mod.HEAPF64[(result >> 3) + 0] = +fileSize;
mod.HEAPF64[(result >> 3) + 1] = 0;
Expand Down
Loading
Loading