From 2a8db10c6a0f9a62336689be9f5ac1fc35a11216 Mon Sep 17 00:00:00 2001 From: Jeff Raymakers Date: Wed, 25 Jun 2025 12:09:22 -0700 Subject: [PATCH] support additional run options, including result table and chunk limit (#14) * result table * db, schema, & result tbl chunk limit * support schema, change db to search path; also errors_as_json * add comment about appender connection --- src/http_server.cpp | 115 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 103 insertions(+), 12 deletions(-) diff --git a/src/http_server.cpp b/src/http_server.cpp index 15dbbd5..68b07a6 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -13,6 +13,8 @@ #include #include #include +#include +#include #include namespace duckdb { @@ -364,8 +366,10 @@ void HttpServer::DoHandleRun(const httplib::Request &req, auto connection_name = req.get_header_value("X-DuckDB-UI-Connection-Name"); - auto database_name = + auto database_name_option = DecodeBase64(req.get_header_value("X-DuckDB-UI-Database-Name")); + auto schema_name_option = + DecodeBase64(req.get_header_value("X-DuckDB-UI-Schema-Name")); std::vector parameter_values; auto parameter_count_string = @@ -379,6 +383,35 @@ void HttpServer::DoHandleRun(const httplib::Request &req, } } + // default to effectively no limit + auto result_chunk_limit = INT_MAX; + auto result_chunk_limit_string = + req.get_header_value("X-DuckDB-UI-Result-Chunk-Limit"); + if (!result_chunk_limit_string.empty()) { + result_chunk_limit = std::stoi(result_chunk_limit_string); + } + + auto result_database_name_option = + DecodeBase64(req.get_header_value("X-DuckDB-UI-Result-Database-Name")); + auto result_schema_name_option = + DecodeBase64(req.get_header_value("X-DuckDB-UI-Result-Schema-Name")); + auto result_table_name = + DecodeBase64(req.get_header_value("X-DuckDB-UI-Result-Table-Name")); + + // If no result table is specified, then the result table chunk limit is zero. + // Otherwise, default to effectively no limit. + auto result_table_chunk_limit = result_table_name.empty() ? 0 : INT_MAX; + auto result_table_chunk_limit_string = + req.get_header_value("X-DuckDB-UI-Result-Table-Chunk-Limit"); + // Only set the result table chunk limit if a result table name is specified. + if (!result_table_name.empty() && !result_table_chunk_limit_string.empty()) { + result_table_chunk_limit = std::stoi(result_table_chunk_limit_string); + } + + auto errors_as_json_string = + req.get_header_value("X-DuckDB-UI-Errors-As-JSON"); + auto errors_as_json = errors_as_json_string == "true"; + std::string content = ReadContent(content_reader); auto db = ddb_instance.lock(); @@ -391,13 +424,21 @@ void HttpServer::DoHandleRun(const httplib::Request &req, auto connection = UIStorageExtensionInfo::GetState(*db).FindOrCreateConnection( *db, connection_name); + auto &context = *connection->context; + auto &config = ClientConfig::GetConfig(context); - // Set current database if optional header is provided. - if (!database_name.empty()) { - auto &context = *connection->context; + // Set errors_as_json + config.errors_as_json = errors_as_json; + + // Set current database & schema + if (!database_name_option.empty() || !schema_name_option.empty()) { + // It's fine if the database name is empty, but we need a valid schema name. + auto schema_name = + schema_name_option.empty() ? DEFAULT_SCHEMA : schema_name_option; context.RunFunctionInTransaction([&] { - auto &manager = context.db->GetDatabaseManager(); - manager.SetDefaultDatabase(context, database_name); + duckdb::ClientData::Get(context).catalog_search_path->Set( + {database_name_option, schema_name}, + duckdb::CatalogSetPathType::SET_SCHEMA); }); } @@ -414,7 +455,7 @@ void HttpServer::DoHandleRun(const httplib::Request &req, // If there's more than one statement, run all but the last. if (statement_count > 1) { for (auto i = 0; i < statement_count - 1; ++i) { - auto pending = connection->PendingQuery(std::move(statements[i])); + auto pending = connection->PendingQuery(std::move(statements[i]), true); // Return any error found before execution. if (pending->HasError()) { SetResponseErrorResult(res, pending->GetError()); @@ -499,17 +540,67 @@ void HttpServer::DoHandleRun(const httplib::Request &req, // Get the result. This should be quick because it's ready. auto result = pending->Execute(); + // We use a separate connection for the appender, including creating the + // result table, because we still need to fetch chunks from the pending + // query on the user's connection. + unique_ptr appender_connection; + unique_ptr appender; + + if (!result_table_name.empty()) { + auto result_database_name = result_database_name_option.empty() + ? "memory" + : result_database_name_option; + auto result_schema_name = result_schema_name_option.empty() + ? "main" + : result_schema_name_option; + + auto result_table_info = make_uniq( + result_database_name, result_schema_name, result_table_name); + for (idx_t i = 0; i < result->names.size(); i++) { + result_table_info->columns.AddColumn( + ColumnDefinition(result->names[i], result->types[i])); + } + + appender_connection = make_uniq(*db); + auto appender_context = appender_connection->context; + appender_context->RunFunctionInTransaction([&] { + auto &catalog = duckdb::Catalog::GetCatalog(*appender_context, + result_database_name); + MetaTransaction::Get(*appender_context) + .ModifyDatabase(catalog.GetAttached()); + catalog.CreateTable(*appender_context, std::move(result_table_info)); + }); + + appender = make_uniq( + *appender_connection, result_database_name, result_schema_name, + result_table_name); + } + // Fetch the chunks and serialize the result. SuccessResult success_result; success_result.column_names_and_types = {std::move(result->names), std::move(result->types)}; - // TODO: support limiting the number of chunks fetched - auto chunk = result->Fetch(); - while (chunk) { - success_result.chunks.push_back( - {static_cast(chunk->size()), std::move(chunk->data)}); + auto chunk_limit = std::max(result_chunk_limit, result_table_chunk_limit); + auto chunks_fetched = 0; + unique_ptr chunk; + while (chunks_fetched < chunk_limit) { chunk = result->Fetch(); + if (!chunk) { + break; + } + ++chunks_fetched; + if (appender && chunks_fetched <= result_table_chunk_limit) { + appender->AppendDataChunk(*chunk); + } + if (chunks_fetched <= result_chunk_limit) { + success_result.chunks.push_back( + {static_cast(chunk->size()), std::move(chunk->data)}); + } + } + + if (appender) { + appender->Close(); } MemoryStream success_response_content;