diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index f7df08312..9f0d11f46 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -292,6 +292,9 @@ if(EMSCRIPTEN) _malloc, \ _calloc, \ _free, \ + stringToUTF8, \ + lengthBytesUTF8, \ + stackAlloc, \ _duckdb_web_clear_response, \ _duckdb_web_collect_file_stats, \ _duckdb_web_connect, \ diff --git a/lib/src/webdb_api.cc b/lib/src/webdb_api.cc index bad9c7f9e..c8377b211 100644 --- a/lib/src/webdb_api.cc +++ b/lib/src/webdb_api.cc @@ -1,3 +1,5 @@ +#include +#include #include #include @@ -93,10 +95,25 @@ void duckdb_web_fs_drop_file(WASMResponse* packed, const char* file_name) { GET_WEBDB(*packed); WASMResponseBuffer::Get().Store(*packed, webdb.DropFile(file_name)); } -/// Drop a file -void duckdb_web_fs_drop_files(WASMResponse* packed) { +/// Drop a files +void duckdb_web_fs_drop_files(WASMResponse* packed, const char** names, int name_count) { GET_WEBDB(*packed); - WASMResponseBuffer::Get().Store(*packed, webdb.DropFiles()); + if (name_count == 0 || names == NULL) { + WASMResponseBuffer::Get().Store(*packed, webdb.DropFiles()); + } else { + for (int i = 0; i < name_count; i++) { + const char* name = names[i]; + if (name == NULL) { + std::cerr << "Error: NULL pointer detected at index " << i << std::endl; + continue; + } + if (std::strlen(name) == 0) { + std::cerr << "Error: Empty string detected at index " << i << std::endl; + continue; + } + WASMResponseBuffer::Get().Store(*packed, webdb.DropFile(name)); + } + } } /// Glob file infos void duckdb_web_fs_glob_file_infos(WASMResponse* packed, const char* file_name) { diff --git a/packages/duckdb-wasm/src/bindings/bindings_base.ts b/packages/duckdb-wasm/src/bindings/bindings_base.ts index f395bdb10..cf729a2ff 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_base.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_base.ts @@ -525,7 +525,7 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { } } } - return handle; + return handle; } /** Register a file object URL async */ public async registerFileHandleAsync( @@ -583,12 +583,52 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { dropResponseBuffers(this.mod); } /** Drop files */ - public dropFiles(): void { - const [s, d, n] = callSRet(this.mod, 'duckdb_web_fs_drop_files', [], []); - if (s !== StatusCode.SUCCESS) { - throw new Error(readString(this.mod, d, n)); + public dropFiles(names?:string[]): void { + const pointers:number[] = []; + let pointerOfArray:number = -1; + try { + for (const str of (names ?? [])) { + if (str !== null && str !== undefined && str.length > 0) { + const size = this.mod.lengthBytesUTF8(str) + 1; + const ret = this.mod._malloc(size); + if (!ret) { + throw new Error(`Failed to allocate memory for string: ${str}`); + } + this.mod.stringToUTF8(str, ret, size); + pointers.push(ret); + } + } + pointerOfArray = this.mod._malloc(pointers.length * 4); + if (!pointerOfArray) { + throw new Error(`Failed to allocate memory for pointers array`); + } + for (let i = 0; i < pointers.length; i++) { + this.mod.HEAP32[(pointerOfArray >> 2) + i] = pointers[i]; + } + const [s, d, n] = callSRet( + this.mod, + 'duckdb_web_fs_drop_files', + [ + 'number', + 'number' + ], + [ + pointerOfArray, + pointers.length + ] + ); + if (s !== StatusCode.SUCCESS) { + throw new Error(readString(this.mod, d, n)); + } + dropResponseBuffers(this.mod); + } finally { + for (const pointer of pointers) { + this.mod._free(pointer); + } + if( pointerOfArray > 0 ){ + this.mod._free(pointerOfArray); + } } - dropResponseBuffers(this.mod); } /** Flush all files */ public flushFiles(): void { @@ -615,11 +655,11 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { return copy; } /** Enable tracking of file statistics */ - public registerOPFSFileName(file: string): Promise { - if (file.startsWith("opfs://")) { - return this.prepareFileHandle(file, DuckDBDataProtocol.BROWSER_FSACCESS); - } else { - throw new Error("Not an OPFS file name: " + file); + public async registerOPFSFileName(file: string): Promise { + if (file.startsWith("opfs://")) { + return await this.prepareFileHandle(file, DuckDBDataProtocol.BROWSER_FSACCESS); + } else { + throw new Error("Not an OPFS file name: " + file); } } public collectFileStatistics(file: string, enable: boolean): void { diff --git a/packages/duckdb-wasm/src/bindings/bindings_interface.ts b/packages/duckdb-wasm/src/bindings/bindings_interface.ts index 271a42ef9..08f8fee47 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_interface.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_interface.ts @@ -58,11 +58,11 @@ export interface DuckDBBindings { prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise; globFiles(path: string): WebFile[]; dropFile(name: string): void; - dropFiles(): void; + dropFiles(names?: string[]): void; flushFiles(): void; copyFileToPath(name: string, path: string): void; copyFileToBuffer(name: string): Uint8Array; - registerOPFSFileName(file: string): void; + registerOPFSFileName(file: string): Promise; collectFileStatistics(file: string, enable: boolean): void; exportFileStatistics(file: string): FileStatistics; } diff --git a/packages/duckdb-wasm/src/bindings/config.ts b/packages/duckdb-wasm/src/bindings/config.ts index ce29ca0f5..a3013d19e 100644 --- a/packages/duckdb-wasm/src/bindings/config.ts +++ b/packages/duckdb-wasm/src/bindings/config.ts @@ -29,6 +29,15 @@ export interface DuckDBFilesystemConfig { allowFullHTTPReads?: boolean; } +export interface DuckDBOPFSConfig { + /** + * Defines how `opfs://` files are handled during SQL execution. + * - "auto": Automatically register `opfs://` files and drop them after execution. + * - "manual": Files must be manually registered and dropped. + */ + fileHandling?: "auto" | "manual"; +} + export enum DuckDBAccessMode { UNDEFINED = 0, AUTOMATIC = 1, @@ -70,4 +79,8 @@ export interface DuckDBConfig { * Custom user agent string */ customUserAgent?: string; + /** + * opfs string + */ + opfs?: DuckDBOPFSConfig; } diff --git a/packages/duckdb-wasm/src/bindings/duckdb_module.ts b/packages/duckdb-wasm/src/bindings/duckdb_module.ts index c75c3e2ed..aafbfb6cb 100644 --- a/packages/duckdb-wasm/src/bindings/duckdb_module.ts +++ b/packages/duckdb-wasm/src/bindings/duckdb_module.ts @@ -7,6 +7,8 @@ export interface DuckDBModule extends EmscriptenModule { stackSave: typeof stackSave; stackAlloc: typeof stackAlloc; stackRestore: typeof stackRestore; + lengthBytesUTF8: typeof lengthBytesUTF8; + stringToUTF8: typeof stringToUTF8; ccall: typeof ccall; PThread: PThread; diff --git a/packages/duckdb-wasm/src/bindings/runtime_browser.ts b/packages/duckdb-wasm/src/bindings/runtime_browser.ts index b5069dac4..3720da7e6 100644 --- a/packages/duckdb-wasm/src/bindings/runtime_browser.ts +++ b/packages/duckdb-wasm/src/bindings/runtime_browser.ts @@ -132,11 +132,14 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { let fileName = opfsPath; if (PATH_SEP_REGEX.test(opfsPath)) { const folders = opfsPath.split(PATH_SEP_REGEX); - fileName = folders.pop()!; + if (folders.length === 0) { + throw new Error(`Invalid path ${opfsPath}`); + } + fileName = folders[folders.length - 1]; if (!fileName) { - throw new Error(`Invalid path ${path}`); + throw new Error(`Invalid path ${opfsPath}. File Not Found.`); } - // mkdir -p + folders.pop(); for (const folder of folders) { dirHandle = await dirHandle.getDirectoryHandle(folder, { create: true }); } diff --git a/packages/duckdb-wasm/src/parallel/async_bindings.ts b/packages/duckdb-wasm/src/parallel/async_bindings.ts index dc8a81e53..3802d2580 100644 --- a/packages/duckdb-wasm/src/parallel/async_bindings.ts +++ b/packages/duckdb-wasm/src/parallel/async_bindings.ts @@ -18,6 +18,7 @@ import { InstantiationProgress } from '../bindings/progress'; import { arrowToSQLField } from '../json_typedef'; import { WebFile } from '../bindings/web_file'; import { DuckDBDataProtocol } from '../bindings'; +import { searchOPFSFiles, isOPFSProtocol } from "../utils/opfs_util"; import { ProgressEntry } from '../log'; const TEXT_ENCODER = new TextEncoder(); @@ -49,6 +50,8 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { protected _nextMessageId = 0; /** The pending requests */ protected _pendingRequests: Map = new Map(); + /** The DuckDBConfig */ + protected _config: DuckDBConfig = {}; constructor(logger: Logger, worker: Worker | null = null) { this._logger = logger; @@ -63,6 +66,11 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { return this._logger; } + /** Get the logger */ + public get config(): DuckDBConfig { + return this._config; + } + /** Attach to worker */ protected attach(worker: Worker): void { this._worker = worker; @@ -104,7 +112,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { transfer: ArrayBuffer[] = [], ): Promise> { if (!this._worker) { - console.error('cannot send a message since the worker is not set!'); + console.error('cannot send a message since the worker is not set!:' + task.type+"," + task.data); return undefined as any; } const mid = this._nextMessageId++; @@ -327,8 +335,8 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { return await this.postTask(task); } /** Try to drop files */ - public async dropFiles(): Promise { - const task = new WorkerTask(WorkerRequestType.DROP_FILES, null); + public async dropFiles(names?: string[]): Promise { + const task = new WorkerTask(WorkerRequestType.DROP_FILES, names); return await this.postTask(task); } /** Flush all files */ @@ -370,6 +378,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { /** Open a new database */ public async open(config: DuckDBConfig): Promise { + this._config = config; const task = new WorkerTask(WorkerRequestType.OPEN, config); await this.postTask(task); } @@ -404,6 +413,21 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { /** Run a query */ public async runQuery(conn: ConnectionID, text: string): Promise { + if( this.shouldOPFSFileHandling() ){ + const files = await this.registerOPFSFileFromSQL(text); + try { + return await this._runQueryAsync(conn, text); + } finally { + if( files.length > 0 ){ + await this.dropFiles(files); + } + } + } else { + return await this._runQueryAsync(conn, text); + } + } + + private async _runQueryAsync(conn: ConnectionID, text: string): Promise { const task = new WorkerTask( WorkerRequestType.RUN_QUERY, [conn, text], @@ -416,6 +440,25 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { conn: ConnectionID, text: string, allowStreamResult: boolean = false, + ): Promise { + if( this.shouldOPFSFileHandling() ){ + const files = await this.registerOPFSFileFromSQL(text); + try { + return await this._startPendingQueryAsync(conn, text, allowStreamResult); + } finally { + if( files.length > 0 ){ + await this.dropFiles(files); + } + } + } else { + return await this._startPendingQueryAsync(conn, text, allowStreamResult); + } + } + + private async _startPendingQueryAsync( + conn: ConnectionID, + text: string, + allowStreamResult: boolean = false, ): Promise { const task = new WorkerTask< WorkerRequestType.START_PENDING_QUERY, @@ -424,6 +467,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { >(WorkerRequestType.START_PENDING_QUERY, [conn, text, allowStreamResult]); return await this.postTask(task); } + /** Poll a pending query */ public async pollPendingQuery(conn: ConnectionID): Promise { const task = new WorkerTask( @@ -657,4 +701,26 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { ); await this.postTask(task); } + + private shouldOPFSFileHandling():boolean { + if( isOPFSProtocol(this.config.path ?? "")){ + return this.config.opfs?.fileHandling == "auto"; + } + return false; + } + + private async registerOPFSFileFromSQL(text: string) { + const files = searchOPFSFiles(text); + const result: string[] = []; + for (const file of files) { + try { + await this.registerOPFSFileName(file); + result.push(file); + } catch (e) { + console.error(e); + throw new Error("File Not found:" + file); + } + } + return result; + } } diff --git a/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts b/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts index 97ba2b191..a7845f2bd 100644 --- a/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts +++ b/packages/duckdb-wasm/src/parallel/async_bindings_interface.ts @@ -32,4 +32,7 @@ export interface AsyncDuckDBBindings { insertArrowFromIPCStream(conn: number, buffer: Uint8Array, options?: CSVInsertOptions): Promise; insertCSVFromPath(conn: number, path: string, options: CSVInsertOptions): Promise; insertJSONFromPath(conn: number, path: string, options: JSONInsertOptions): Promise; + + dropFile(name: string):Promise; + dropFiles(names?: string[]):Promise; } diff --git a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts index 3a5a8f295..db05cb50f 100644 --- a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts +++ b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts @@ -149,7 +149,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger { this.sendOK(request); break; case WorkerRequestType.DROP_FILES: - this._bindings.dropFiles(); + this._bindings.dropFiles(request.data); this.sendOK(request); break; case WorkerRequestType.FLUSH_FILES: @@ -361,7 +361,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger { break; case WorkerRequestType.REGISTER_OPFS_FILE_NAME: - this._bindings.registerOPFSFileName(request.data[0]); + await this._bindings.registerOPFSFileName(request.data[0]); this.sendOK(request); break; diff --git a/packages/duckdb-wasm/src/parallel/worker_request.ts b/packages/duckdb-wasm/src/parallel/worker_request.ts index 38502b7b6..5de3c37c0 100644 --- a/packages/duckdb-wasm/src/parallel/worker_request.ts +++ b/packages/duckdb-wasm/src/parallel/worker_request.ts @@ -117,7 +117,7 @@ export type WorkerRequestVariant = | WorkerRequest | WorkerRequest | WorkerRequest - | WorkerRequest + | WorkerRequest | WorkerRequest | WorkerRequest | WorkerRequest @@ -178,7 +178,7 @@ export type WorkerTaskVariant = | WorkerTask | WorkerTask | WorkerTask - | WorkerTask + | WorkerTask | WorkerTask | WorkerTask | WorkerTask diff --git a/packages/duckdb-wasm/src/utils/opfs_util.ts b/packages/duckdb-wasm/src/utils/opfs_util.ts new file mode 100644 index 000000000..822eb4a7c --- /dev/null +++ b/packages/duckdb-wasm/src/utils/opfs_util.ts @@ -0,0 +1,10 @@ +export const REGEX_OPFS_FILE = /'(opfs:\/\/\S*?)'/g; +export const REGEX_OPFS_PROTOCOL = /(opfs:\/\/\S*?)/g; + +export function isOPFSProtocol(path: string): boolean { + return path.search(REGEX_OPFS_PROTOCOL) > -1; +} + +export function searchOPFSFiles(text: string) { + return [...text.matchAll(REGEX_OPFS_FILE)].map(match => match[1]); +} \ No newline at end of file diff --git a/packages/duckdb-wasm/test/opfs.test.ts b/packages/duckdb-wasm/test/opfs.test.ts index eaf1a0fcc..9c263f479 100644 --- a/packages/duckdb-wasm/test/opfs.test.ts +++ b/packages/duckdb-wasm/test/opfs.test.ts @@ -1,13 +1,22 @@ -import * as duckdb from '../src/'; -import {LogLevel} from '../src/'; +import { + AsyncDuckDB, + AsyncDuckDBConnection, + ConsoleLogger, + DuckDBAccessMode, + DuckDBBundle, + DuckDBDataProtocol, + LogLevel +} from '../src/'; import * as arrow from 'apache-arrow'; -export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): void { - let db: duckdb.AsyncDuckDB; - let conn: duckdb.AsyncDuckDBConnection; +export function testOPFS(baseDir: string, bundle: () => DuckDBBundle): void { + const logger = new ConsoleLogger(LogLevel.ERROR); + + let db: AsyncDuckDB; + let conn: AsyncDuckDBConnection; beforeAll(async () => { - removeFiles(); + await removeFiles(); }); afterAll(async () => { @@ -17,38 +26,47 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo if (db) { await db.terminate(); } - removeFiles(); + await removeFiles(); }); beforeEach(async () => { - removeFiles(); - // - const logger = new duckdb.ConsoleLogger(LogLevel.ERROR); + await removeFiles(); const worker = new Worker(bundle().mainWorker!); - db = new duckdb.AsyncDuckDB(logger, worker); + db = new AsyncDuckDB(logger, worker); await db.instantiate(bundle().mainModule, bundle().pthreadWorker); await db.open({ path: 'opfs://test.db', - accessMode: duckdb.DuckDBAccessMode.READ_WRITE + accessMode: DuckDBAccessMode.READ_WRITE }); conn = await db.connect(); }); afterEach(async () => { if (conn) { - await conn.close(); + await conn.close().catch(() => { + }); } if (db) { - await db.terminate(); + await db.reset().catch(() => { + }); + await db.terminate().catch(() => { + }); + await db.dropFiles().catch(() => { + }); } - removeFiles(); + await removeFiles(); }); describe('Load Data in OPFS', () => { it('Import Small Parquet file', async () => { - await conn.send(`CREATE TABLE stu AS SELECT * FROM "${baseDir}/uni/studenten.parquet"`); + //1. data preparation + await conn.send(`CREATE TABLE stu AS + SELECT * + FROM "${ baseDir }/uni/studenten.parquet"`); await conn.send(`CHECKPOINT;`); - const result = await conn.send(`SELECT matrnr FROM stu;`); + + const result = await conn.send(`SELECT matrnr + FROM stu;`); const batches = []; for await (const batch of result) { batches.push(batch); @@ -58,11 +76,15 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo new Int32Array([24002, 25403, 26120, 26830, 27550, 28106, 29120, 29555]), ); }); - it('Import Larget Parquet file', async () => { - await conn.send(`CREATE TABLE lineitem AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + //1. data preparation + await conn.send(`CREATE TABLE lineitem AS + SELECT * + FROM "${ baseDir }/tpch/0_01/parquet/lineitem.parquet"`); await conn.send(`CHECKPOINT;`); - const result = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem;`); + + const result = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM lineitem;`); const batches = []; for await (const batch of result) { batches.push(batch); @@ -72,22 +94,28 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo }); it('Load Existing DB File', async () => { - await conn.send(`CREATE TABLE tmp AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + //1. data preparation + await conn.send(`CREATE TABLE tmp AS + SELECT * + FROM "${ baseDir }/tpch/0_01/parquet/lineitem.parquet"`); await conn.send(`CHECKPOINT;`); + await conn.close(); + await db.reset(); + await db.dropFiles(); await db.terminate(); - const logger = new duckdb.ConsoleLogger(LogLevel.ERROR); const worker = new Worker(bundle().mainWorker!); - db = new duckdb.AsyncDuckDB(logger, worker); + db = new AsyncDuckDB(logger, worker); await db.instantiate(bundle().mainModule, bundle().pthreadWorker); await db.open({ path: 'opfs://test.db', - accessMode: duckdb.DuckDBAccessMode.READ_WRITE + accessMode: DuckDBAccessMode.READ_WRITE }); conn = await db.connect(); - const result = await conn.send(`SELECT count(*)::INTEGER as cnt FROM tmp;`); + const result = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM tmp;`); const batches = []; for await (const batch of result) { batches.push(batch); @@ -98,20 +126,20 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo it('Load Parquet file that are already with empty handler', async () => { //1. write to opfs - const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res => - res.arrayBuffer(), - ); - const opfsRoot = await navigator.storage.getDirectory(); - const fileHandle = await opfsRoot.getFileHandle('test.parquet', {create: true}); - const writable = await fileHandle.createWritable(); - await writable.write(parquetBuffer); - await writable.close(); + const fileHandler = await getOpfsFileHandlerFromUrl({ + url: `${ baseDir }/tpch/0_01/parquet/lineitem.parquet`, + path: 'test.parquet' + }); //2. handle is empty object, because worker gets a File Handle using the file name. - await db.registerFileHandle('test.parquet', null, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); - await conn.send(`CREATE TABLE lineitem1 AS SELECT * FROM read_parquet('test.parquet')`); + await db.registerFileHandle('test.parquet', fileHandler, DuckDBDataProtocol.BROWSER_FSACCESS, true); + //3. data preparation + await conn.send(`CREATE TABLE lineitem1 AS + SELECT * + FROM read_parquet('test.parquet')`); await conn.send(`CHECKPOINT;`); - const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem1;`); + const result1 = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM lineitem1;`); const batches1 = []; for await (const batch of result1) { batches1.push(batch); @@ -122,21 +150,20 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo it('Load Parquet file that are already with opfs file handler in datadir', async () => { //1. write to opfs - const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res => - res.arrayBuffer(), - ); - const opfsRoot = await navigator.storage.getDirectory(); - const datadir = await opfsRoot.getDirectoryHandle("datadir", {create: true}); - const fileHandle = await datadir.getFileHandle('test.parquet', {create: true}); - const writable = await fileHandle.createWritable(); - await writable.write(parquetBuffer); - await writable.close(); + const fileHandler = await getOpfsFileHandlerFromUrl({ + url: `${ baseDir }/tpch/0_01/parquet/lineitem.parquet`, + path: 'datadir/test.parquet' + }); //2. handle is opfs file handler - await db.registerFileHandle('test.parquet', fileHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); - await conn.send(`CREATE TABLE lineitem1 AS SELECT * FROM read_parquet('test.parquet')`); + await db.registerFileHandle('datadir/test.parquet', fileHandler, DuckDBDataProtocol.BROWSER_FSACCESS, true); + //3. data preparation + await conn.send(`CREATE TABLE lineitem1 AS + SELECT * + FROM read_parquet('datadir/test.parquet')`); await conn.send(`CHECKPOINT;`); - const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem1;`); + const result1 = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM lineitem1;`); const batches1 = []; for await (const batch of result1) { batches1.push(batch); @@ -146,25 +173,30 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo }); it('Load Parquet file that are already', async () => { - const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res => - res.arrayBuffer(), - ); - const opfsRoot = await navigator.storage.getDirectory(); - const fileHandle = await opfsRoot.getFileHandle('test.parquet', {create: true}); - const writable = await fileHandle.createWritable(); - await writable.write(parquetBuffer); - await writable.close(); - - await db.registerFileHandle('test.parquet', fileHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); - await conn.send(`CREATE TABLE lineitem1 AS SELECT * FROM read_parquet('test.parquet')`); + //1. write to opfs + const fileHandle = await getOpfsFileHandlerFromUrl({ + url: `${ baseDir }/tpch/0_01/parquet/lineitem.parquet`, + path: 'test.parquet' + }); + //2. handle is opfs file handler + await db.registerFileHandle('test.parquet', fileHandle, DuckDBDataProtocol.BROWSER_FSACCESS, true); + //3. data preparation + await conn.send(`CREATE TABLE lineitem1 AS + SELECT * + FROM read_parquet('test.parquet')`); await conn.send(`CHECKPOINT;`); - await conn.send(`CREATE TABLE lineitem2 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CREATE TABLE lineitem2 AS + SELECT * + FROM read_parquet('test.parquet')`); await conn.send(`CHECKPOINT;`); - await conn.send(`CREATE TABLE lineitem3 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CREATE TABLE lineitem3 AS + SELECT * + FROM read_parquet('test.parquet')`); await conn.send(`CHECKPOINT;`); { - const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem1;`); + const result1 = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM lineitem1;`); const batches1 = []; for await (const batch of result1) { batches1.push(batch); @@ -174,7 +206,8 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo } { - const result2 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem2;`); + const result2 = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM lineitem2;`); const batches2 = []; for await (const batch of result2) { batches2.push(batch); @@ -184,7 +217,8 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo } { - const result3 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem3;`); + const result3 = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM lineitem3;`); const batches3 = []; for await (const batch of result3) { batches3.push(batch); @@ -196,21 +230,26 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo }); it('Drop File + Export as CSV to OPFS + Load CSV', async () => { + //1. write to opfs const opfsRoot = await navigator.storage.getDirectory(); - const testHandle = await opfsRoot.getFileHandle('test.csv', {create: true}); - await db.registerFileHandle('test.csv', testHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); - await conn.send(`CREATE TABLE zzz AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + const fileHandler = await opfsRoot.getFileHandle('test.csv', { create: true }); + //2. handle is opfs file handler + await db.registerFileHandle('test.csv', fileHandler, DuckDBDataProtocol.BROWSER_FSACCESS, true); + //3. data preparation + await conn.send(`CREATE TABLE zzz AS + SELECT * + FROM '${ baseDir }/tpch/0_01/parquet/lineitem.parquet'`); await conn.send(`COPY (SELECT * FROM zzz) TO 'test.csv'`); await conn.send(`COPY (SELECT * FROM zzz) TO 'non_existing.csv'`); await conn.close(); await db.dropFile('test.csv'); await db.reset(); - await db.open({}); conn = await db.connect(); - await db.registerFileHandle('test.csv', testHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test.csv', fileHandler, DuckDBDataProtocol.BROWSER_FSACCESS, true); - const result = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test.csv';`); + const result = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM 'test.csv';`); const batches = []; for await (const batch of result) { batches.push(batch); @@ -221,33 +260,39 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo await db.dropFile('test.csv'); }); - it('Drop Files + Export as CSV to OPFS + Load CSV', async () => { + //1. write to opfs const opfsRoot = await navigator.storage.getDirectory(); - const testHandle1 = await opfsRoot.getFileHandle('test1.csv', {create: true}); - const testHandle2 = await opfsRoot.getFileHandle('test2.csv', {create: true}); - const testHandle3 = await opfsRoot.getFileHandle('test3.csv', {create: true}); - await db.registerFileHandle('test1.csv', testHandle1, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); - await db.registerFileHandle('test2.csv', testHandle2, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); - await db.registerFileHandle('test3.csv', testHandle3, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); - - await conn.send(`CREATE TABLE zzz AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + const testHandle1 = await opfsRoot.getFileHandle('test1.csv', { create: true }); + const testHandle2 = await opfsRoot.getFileHandle('test2.csv', { create: true }); + const testHandle3 = await opfsRoot.getFileHandle('test3.csv', { create: true }); + //2. handle is opfs file handler + await db.registerFileHandle('test1.csv', testHandle1, DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test2.csv', testHandle2, DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test3.csv', testHandle3, DuckDBDataProtocol.BROWSER_FSACCESS, true); + //3. data preparation + await conn.send(`CREATE TABLE zzz AS + SELECT * + FROM "${ baseDir }/tpch/0_01/parquet/lineitem.parquet"`); await conn.send(`COPY (SELECT * FROM zzz) TO 'test1.csv'`); await conn.send(`COPY (SELECT * FROM zzz) TO 'test2.csv'`); await conn.send(`COPY (SELECT * FROM zzz) TO 'test3.csv'`); await conn.close(); - await db.dropFiles(); + //4. dropFiles + await db.dropFiles(['test1.csv', 'test2.csv', 'test3.csv']); + + //5. reset await db.reset(); - await db.open({}); conn = await db.connect(); - await db.registerFileHandle('test1.csv', testHandle1, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); - await db.registerFileHandle('test2.csv', testHandle2, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); - await db.registerFileHandle('test3.csv', testHandle3, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test1.csv', testHandle1, DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test2.csv', testHandle2, DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test3.csv', testHandle3, DuckDBDataProtocol.BROWSER_FSACCESS, true); { - const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test1.csv';`); + const result1 = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM 'test1.csv';`); const batches1 = []; for await (const batch of result1) { batches1.push(batch); @@ -256,7 +301,8 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); } { - const result2 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test2.csv';`); + const result2 = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM 'test2.csv';`); const batches2 = []; for await (const batch of result2) { batches2.push(batch); @@ -265,7 +311,8 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo expect(table2.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); } { - const result3 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test3.csv';`); + const result3 = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM 'test3.csv';`); const batches3 = []; for await (const batch of result3) { batches3.push(batch); @@ -273,35 +320,168 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo const table3 = await new arrow.Table<{ cnt: arrow.Int }>(batches3); expect(table3.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); } - - await db.dropFiles(); }); - }); - async function removeFiles() { - const opfsRoot = await navigator.storage.getDirectory(); - await opfsRoot.removeEntry('test.db').catch(() => { - }); - await opfsRoot.removeEntry('test.db.wal').catch(() => { - }); - await opfsRoot.removeEntry('test.csv').catch(() => { + it('Load Parquet file when FROM clause', async () => { + //1. write to opfs + await getOpfsFileHandlerFromUrl({ + url: `${ baseDir }/tpch/0_01/parquet/lineitem.parquet`, + path: 'test.parquet' + }); + await conn.close(); + await db.reset(); + await db.dropFile('test.parquet'); + db.config.opfs = { + fileHandling: "auto" + }; + conn = await db.connect(); + //2. send query + const result1 = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM 'opfs://test.parquet'`); + const batches1 = []; + for await (const batch of result1) { + batches1.push(batch); + } + const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1); + expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); }); - await opfsRoot.removeEntry('test1.csv').catch(() => { + + it('Load Parquet file when FROM clause + read_parquet', async () => { + //1. write to opfs + await getOpfsFileHandlerFromUrl({ + url: `${ baseDir }/uni/studenten.parquet`, + path: 'test.parquet' + }); + await conn.close(); + await db.reset(); + await db.dropFile('test.parquet'); + db.config.opfs = { + fileHandling: "auto" + }; + conn = await db.connect(); + //2. send query + const result = await conn.send(`SELECT * + FROM read_parquet('opfs://test.parquet');`); + const batches = []; + for await (const batch of result) { + batches.push(batch); + } + const table = await new arrow.Table<{ cnt: arrow.Int }>(batches); + expect(table.getChildAt(0)?.toArray()).toEqual( + new Int32Array([24002, 25403, 26120, 26830, 27550, 28106, 29120, 29555]), + ); }); - await opfsRoot.removeEntry('test2.csv').catch(() => { + + it('Load Parquet file with dir when FROM clause', async () => { + //1. write to opfs + await getOpfsFileHandlerFromUrl({ + url: `${ baseDir }/tpch/0_01/parquet/lineitem.parquet`, + path: 'datadir/test.parquet' + }); + await conn.close(); + await db.reset(); + await db.dropFile('datadir/test.parquet'); + db.config.opfs = { + fileHandling: "auto" + }; + conn = await db.connect(); + //2. send query + const result1 = await conn.send(`SELECT count(*) ::INTEGER as cnt + FROM 'opfs://datadir/test.parquet'`); + const batches1 = []; + for await (const batch of result1) { + batches1.push(batch); + } + const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1); + expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); }); - await opfsRoot.removeEntry('test3.csv').catch(() => { + + it('Load Parquet file with dir when FROM clause with IO Error', async () => { + //1. write to opfs + await getOpfsFileHandlerFromUrl({ + url: `${ baseDir }/tpch/0_01/parquet/lineitem.parquet`, + path: 'datadir/test.parquet' + }); + try { + //2. send query + await expectAsync( + conn.send(`SELECT count(*) ::INTEGER as cnt + FROM 'opfs://datadir/test.parquet'`) + ).toBeRejectedWithError("IO Error: No files found that match the pattern \"opfs://datadir/test.parquet\""); + } finally { + await db.reset(); + await db.dropFiles(); + } }); - await opfsRoot.removeEntry('test.parquet').catch(() => { + + it('Copy CSV to OPFS + Load CSV', async () => { + //1. data preparation + db.config.opfs = { + fileHandling: "auto" + }; + await conn.query(`COPY ( SELECT 32 AS value ) TO 'opfs://file.csv'`); + await conn.query(`COPY ( SELECT 42 AS value ) TO 'opfs://file.csv'`); + const result = await conn.send(`SELECT * + FROM 'opfs://file.csv';`); + const batches = []; + for await (const batch of result) { + batches.push(batch); + } + const table = await new arrow.Table<{ cnt: arrow.Int }>(batches); + expect(table.getChildAt(0)?.toArray()).toEqual( + new BigInt64Array([42n]), + ); }); + }); + + async function removeFiles() { + const opfsRoot = await navigator.storage.getDirectory(); + await opfsRoot.removeEntry('test.db').catch(_ignore); + await opfsRoot.removeEntry('test.db.wal').catch(_ignore); + await opfsRoot.removeEntry('test.csv').catch(_ignore); + await opfsRoot.removeEntry('test1.csv').catch(_ignore); + await opfsRoot.removeEntry('test2.csv').catch(_ignore); + await opfsRoot.removeEntry('test3.csv').catch(_ignore); + await opfsRoot.removeEntry('test.parquet').catch(_ignore); try { const datadir = await opfsRoot.getDirectoryHandle('datadir'); - datadir.removeEntry('test.parquet').catch(() => { - }); + datadir.removeEntry('test.parquet').catch(_ignore); } catch (e) { // } - await opfsRoot.removeEntry('datadir').catch(() => { - }); + await opfsRoot.removeEntry('datadir').catch(_ignore); + } + + async function getOpfsFileHandlerFromUrl(params: { + url: string; + path: string; + }): Promise { + const PATH_SEP_REGEX = /\/|\\/; + const parquetBuffer = await fetch(params.url).then(res => + res.arrayBuffer(), + ); + const opfsRoot = await navigator.storage.getDirectory(); + let dirHandle: FileSystemDirectoryHandle = opfsRoot; + let fileName = params.path; + if (PATH_SEP_REGEX.test(params.path)) { + const folders = params.path.split(PATH_SEP_REGEX); + fileName = folders.pop()!; + if (!fileName) { + throw new Error(`Invalid path ${ params.path }`); + } + for (const folder of folders) { + dirHandle = await dirHandle.getDirectoryHandle(folder, { create: true }); + } + } + const fileHandle = await dirHandle.getFileHandle(fileName, { create: true }); + const writable = await fileHandle.createWritable(); + await writable.write(parquetBuffer); + await writable.close(); + + return fileHandle; } } + +//ignore block +const _ignore: () => void = () => { +}; diff --git a/patches/duckdb/binary_executor.patch b/patches/duckdb/binary_executor.patch deleted file mode 100644 index d93319cbc..000000000 --- a/patches/duckdb/binary_executor.patch +++ /dev/null @@ -1,92 +0,0 @@ -diff --git a/src/include/duckdb/common/vector_operations/binary_executor.hpp b/src/include/duckdb/common/vector_operations/binary_executor.hpp -index 55c10bb289..c5f57edabf 100644 ---- a/src/include/duckdb/common/vector_operations/binary_executor.hpp -+++ b/src/include/duckdb/common/vector_operations/binary_executor.hpp -@@ -381,6 +381,8 @@ public: - } - } - -+#define DUCKDB_SMALLER_BINARY -+ - template - static idx_t SelectFlat(Vector &left, Vector &right, const SelectionVector *sel, idx_t count, - SelectionVector *true_sel, SelectionVector *false_sel) { -@@ -417,14 +419,22 @@ public: - ldata, rdata, sel, count, combined_mask, true_sel, false_sel); - } - } -- -+#ifndef DUCKDB_SMALLER_BINARY - template -+#else -+ template -+#endif - static inline idx_t - SelectGenericLoop(const LEFT_TYPE *__restrict ldata, const RIGHT_TYPE *__restrict rdata, - const SelectionVector *__restrict lsel, const SelectionVector *__restrict rsel, - const SelectionVector *__restrict result_sel, idx_t count, ValidityMask &lvalidity, - ValidityMask &rvalidity, SelectionVector *true_sel, SelectionVector *false_sel) { - idx_t true_count = 0, false_count = 0; -+#ifdef DUCKDB_SMALLER_BINARY -+ const bool HAS_TRUE_SEL = true_sel; -+ const bool HAS_FALSE_SEL = false_sel; -+ const bool NO_NULL = false; -+#endif - for (idx_t i = 0; i < count; i++) { - auto result_idx = result_sel->get_index(i); - auto lindex = lsel->get_index(i); -@@ -452,6 +462,7 @@ public: - const SelectionVector *__restrict lsel, const SelectionVector *__restrict rsel, - const SelectionVector *__restrict result_sel, idx_t count, ValidityMask &lvalidity, - ValidityMask &rvalidity, SelectionVector *true_sel, SelectionVector *false_sel) { -+#ifndef DUCKDB_SMALLER_BINARY - if (true_sel && false_sel) { - return SelectGenericLoop( - ldata, rdata, lsel, rsel, result_sel, count, lvalidity, rvalidity, true_sel, false_sel); -@@ -463,6 +474,10 @@ public: - return SelectGenericLoop( - ldata, rdata, lsel, rsel, result_sel, count, lvalidity, rvalidity, true_sel, false_sel); - } -+#else -+ return SelectGenericLoop(ldata, rdata, lsel, rsel, result_sel, count, lvalidity, -+ rvalidity, true_sel, false_sel); -+#endif - } - - template -@@ -471,10 +486,13 @@ public: - const SelectionVector *__restrict lsel, const SelectionVector *__restrict rsel, - const SelectionVector *__restrict result_sel, idx_t count, ValidityMask &lvalidity, - ValidityMask &rvalidity, SelectionVector *true_sel, SelectionVector *false_sel) { -+#ifndef DUCKDB_SMALLER_BINARY - if (!lvalidity.AllValid() || !rvalidity.AllValid()) { - return SelectGenericLoopSelSwitch( - ldata, rdata, lsel, rsel, result_sel, count, lvalidity, rvalidity, true_sel, false_sel); -- } else { -+ } else -+#endif -+ { - return SelectGenericLoopSelSwitch( - ldata, rdata, lsel, rsel, result_sel, count, lvalidity, rvalidity, true_sel, false_sel); - } -@@ -502,6 +520,7 @@ public: - if (left.GetVectorType() == VectorType::CONSTANT_VECTOR && - right.GetVectorType() == VectorType::CONSTANT_VECTOR) { - return SelectConstant(left, right, sel, count, true_sel, false_sel); -+#ifndef DUCKDB_SMALLER_BINARY - } else if (left.GetVectorType() == VectorType::CONSTANT_VECTOR && - right.GetVectorType() == VectorType::FLAT_VECTOR) { - return SelectFlat(left, right, sel, count, true_sel, false_sel); -@@ -511,10 +530,12 @@ public: - } else if (left.GetVectorType() == VectorType::FLAT_VECTOR && - right.GetVectorType() == VectorType::FLAT_VECTOR) { - return SelectFlat(left, right, sel, count, true_sel, false_sel); -+#endif - } else { - return SelectGeneric(left, right, sel, count, true_sel, false_sel); - } - } -+#undef DUCKDB_SMALLER_BINARY - }; - - } // namespace duckdb diff --git a/patches/duckdb/extension_install_rework.patch b/patches/duckdb/extension_install_rework.patch index 43e015f44..b3f0d7db9 100644 --- a/patches/duckdb/extension_install_rework.patch +++ b/patches/duckdb/extension_install_rework.patch @@ -107,15 +107,6 @@ index e8ab595ab0..fb3e6371a3 100644 #ifdef WASM_LOADABLE_EXTENSIONS // Install is currently a no-op return nullptr; -@@ -209,7 +215,7 @@ string ExtensionHelper::ExtensionUrlTemplate(optional_ptr