support additional run options, including result table and chunk limit (#14)
* result table * db, schema, & result tbl chunk limit * support schema, change db to search path; also errors_as_json * add comment about appender connection
This commit is contained in:
@@ -13,6 +13,8 @@
|
|||||||
#include <duckdb/common/serializer/binary_serializer.hpp>
|
#include <duckdb/common/serializer/binary_serializer.hpp>
|
||||||
#include <duckdb/common/serializer/memory_stream.hpp>
|
#include <duckdb/common/serializer/memory_stream.hpp>
|
||||||
#include <duckdb/main/attached_database.hpp>
|
#include <duckdb/main/attached_database.hpp>
|
||||||
|
#include <duckdb/main/client_data.hpp>
|
||||||
|
#include <duckdb/parser/parsed_data/create_table_info.hpp>
|
||||||
#include <duckdb/parser/parser.hpp>
|
#include <duckdb/parser/parser.hpp>
|
||||||
|
|
||||||
namespace duckdb {
|
namespace duckdb {
|
||||||
@@ -364,8 +366,10 @@ void HttpServer::DoHandleRun(const httplib::Request &req,
|
|||||||
|
|
||||||
auto connection_name = req.get_header_value("X-DuckDB-UI-Connection-Name");
|
auto connection_name = req.get_header_value("X-DuckDB-UI-Connection-Name");
|
||||||
|
|
||||||
auto database_name =
|
auto database_name_option =
|
||||||
DecodeBase64(req.get_header_value("X-DuckDB-UI-Database-Name"));
|
DecodeBase64(req.get_header_value("X-DuckDB-UI-Database-Name"));
|
||||||
|
auto schema_name_option =
|
||||||
|
DecodeBase64(req.get_header_value("X-DuckDB-UI-Schema-Name"));
|
||||||
|
|
||||||
std::vector<std::string> parameter_values;
|
std::vector<std::string> parameter_values;
|
||||||
auto parameter_count_string =
|
auto parameter_count_string =
|
||||||
@@ -379,6 +383,35 @@ void HttpServer::DoHandleRun(const httplib::Request &req,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// default to effectively no limit
|
||||||
|
auto result_chunk_limit = INT_MAX;
|
||||||
|
auto result_chunk_limit_string =
|
||||||
|
req.get_header_value("X-DuckDB-UI-Result-Chunk-Limit");
|
||||||
|
if (!result_chunk_limit_string.empty()) {
|
||||||
|
result_chunk_limit = std::stoi(result_chunk_limit_string);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto result_database_name_option =
|
||||||
|
DecodeBase64(req.get_header_value("X-DuckDB-UI-Result-Database-Name"));
|
||||||
|
auto result_schema_name_option =
|
||||||
|
DecodeBase64(req.get_header_value("X-DuckDB-UI-Result-Schema-Name"));
|
||||||
|
auto result_table_name =
|
||||||
|
DecodeBase64(req.get_header_value("X-DuckDB-UI-Result-Table-Name"));
|
||||||
|
|
||||||
|
// If no result table is specified, then the result table chunk limit is zero.
|
||||||
|
// Otherwise, default to effectively no limit.
|
||||||
|
auto result_table_chunk_limit = result_table_name.empty() ? 0 : INT_MAX;
|
||||||
|
auto result_table_chunk_limit_string =
|
||||||
|
req.get_header_value("X-DuckDB-UI-Result-Table-Chunk-Limit");
|
||||||
|
// Only set the result table chunk limit if a result table name is specified.
|
||||||
|
if (!result_table_name.empty() && !result_table_chunk_limit_string.empty()) {
|
||||||
|
result_table_chunk_limit = std::stoi(result_table_chunk_limit_string);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto errors_as_json_string =
|
||||||
|
req.get_header_value("X-DuckDB-UI-Errors-As-JSON");
|
||||||
|
auto errors_as_json = errors_as_json_string == "true";
|
||||||
|
|
||||||
std::string content = ReadContent(content_reader);
|
std::string content = ReadContent(content_reader);
|
||||||
|
|
||||||
auto db = ddb_instance.lock();
|
auto db = ddb_instance.lock();
|
||||||
@@ -391,13 +424,21 @@ void HttpServer::DoHandleRun(const httplib::Request &req,
|
|||||||
auto connection =
|
auto connection =
|
||||||
UIStorageExtensionInfo::GetState(*db).FindOrCreateConnection(
|
UIStorageExtensionInfo::GetState(*db).FindOrCreateConnection(
|
||||||
*db, connection_name);
|
*db, connection_name);
|
||||||
|
auto &context = *connection->context;
|
||||||
|
auto &config = ClientConfig::GetConfig(context);
|
||||||
|
|
||||||
// Set current database if optional header is provided.
|
// Set errors_as_json
|
||||||
if (!database_name.empty()) {
|
config.errors_as_json = errors_as_json;
|
||||||
auto &context = *connection->context;
|
|
||||||
|
// Set current database & schema
|
||||||
|
if (!database_name_option.empty() || !schema_name_option.empty()) {
|
||||||
|
// It's fine if the database name is empty, but we need a valid schema name.
|
||||||
|
auto schema_name =
|
||||||
|
schema_name_option.empty() ? DEFAULT_SCHEMA : schema_name_option;
|
||||||
context.RunFunctionInTransaction([&] {
|
context.RunFunctionInTransaction([&] {
|
||||||
auto &manager = context.db->GetDatabaseManager();
|
duckdb::ClientData::Get(context).catalog_search_path->Set(
|
||||||
manager.SetDefaultDatabase(context, database_name);
|
{database_name_option, schema_name},
|
||||||
|
duckdb::CatalogSetPathType::SET_SCHEMA);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -414,7 +455,7 @@ void HttpServer::DoHandleRun(const httplib::Request &req,
|
|||||||
// If there's more than one statement, run all but the last.
|
// If there's more than one statement, run all but the last.
|
||||||
if (statement_count > 1) {
|
if (statement_count > 1) {
|
||||||
for (auto i = 0; i < statement_count - 1; ++i) {
|
for (auto i = 0; i < statement_count - 1; ++i) {
|
||||||
auto pending = connection->PendingQuery(std::move(statements[i]));
|
auto pending = connection->PendingQuery(std::move(statements[i]), true);
|
||||||
// Return any error found before execution.
|
// Return any error found before execution.
|
||||||
if (pending->HasError()) {
|
if (pending->HasError()) {
|
||||||
SetResponseErrorResult(res, pending->GetError());
|
SetResponseErrorResult(res, pending->GetError());
|
||||||
@@ -499,17 +540,67 @@ void HttpServer::DoHandleRun(const httplib::Request &req,
|
|||||||
// Get the result. This should be quick because it's ready.
|
// Get the result. This should be quick because it's ready.
|
||||||
auto result = pending->Execute();
|
auto result = pending->Execute();
|
||||||
|
|
||||||
|
// We use a separate connection for the appender, including creating the
|
||||||
|
// result table, because we still need to fetch chunks from the pending
|
||||||
|
// query on the user's connection.
|
||||||
|
unique_ptr<duckdb::Connection> appender_connection;
|
||||||
|
unique_ptr<duckdb::Appender> appender;
|
||||||
|
|
||||||
|
if (!result_table_name.empty()) {
|
||||||
|
auto result_database_name = result_database_name_option.empty()
|
||||||
|
? "memory"
|
||||||
|
: result_database_name_option;
|
||||||
|
auto result_schema_name = result_schema_name_option.empty()
|
||||||
|
? "main"
|
||||||
|
: result_schema_name_option;
|
||||||
|
|
||||||
|
auto result_table_info = make_uniq<duckdb::CreateTableInfo>(
|
||||||
|
result_database_name, result_schema_name, result_table_name);
|
||||||
|
for (idx_t i = 0; i < result->names.size(); i++) {
|
||||||
|
result_table_info->columns.AddColumn(
|
||||||
|
ColumnDefinition(result->names[i], result->types[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
appender_connection = make_uniq<duckdb::Connection>(*db);
|
||||||
|
auto appender_context = appender_connection->context;
|
||||||
|
appender_context->RunFunctionInTransaction([&] {
|
||||||
|
auto &catalog = duckdb::Catalog::GetCatalog(*appender_context,
|
||||||
|
result_database_name);
|
||||||
|
MetaTransaction::Get(*appender_context)
|
||||||
|
.ModifyDatabase(catalog.GetAttached());
|
||||||
|
catalog.CreateTable(*appender_context, std::move(result_table_info));
|
||||||
|
});
|
||||||
|
|
||||||
|
appender = make_uniq<duckdb::Appender>(
|
||||||
|
*appender_connection, result_database_name, result_schema_name,
|
||||||
|
result_table_name);
|
||||||
|
}
|
||||||
|
|
||||||
// Fetch the chunks and serialize the result.
|
// Fetch the chunks and serialize the result.
|
||||||
SuccessResult success_result;
|
SuccessResult success_result;
|
||||||
success_result.column_names_and_types = {std::move(result->names),
|
success_result.column_names_and_types = {std::move(result->names),
|
||||||
std::move(result->types)};
|
std::move(result->types)};
|
||||||
|
|
||||||
// TODO: support limiting the number of chunks fetched
|
auto chunk_limit = std::max(result_chunk_limit, result_table_chunk_limit);
|
||||||
auto chunk = result->Fetch();
|
auto chunks_fetched = 0;
|
||||||
while (chunk) {
|
unique_ptr<duckdb::DataChunk> chunk;
|
||||||
success_result.chunks.push_back(
|
while (chunks_fetched < chunk_limit) {
|
||||||
{static_cast<uint16_t>(chunk->size()), std::move(chunk->data)});
|
|
||||||
chunk = result->Fetch();
|
chunk = result->Fetch();
|
||||||
|
if (!chunk) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
++chunks_fetched;
|
||||||
|
if (appender && chunks_fetched <= result_table_chunk_limit) {
|
||||||
|
appender->AppendDataChunk(*chunk);
|
||||||
|
}
|
||||||
|
if (chunks_fetched <= result_chunk_limit) {
|
||||||
|
success_result.chunks.push_back(
|
||||||
|
{static_cast<uint16_t>(chunk->size()), std::move(chunk->data)});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (appender) {
|
||||||
|
appender->Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryStream success_response_content;
|
MemoryStream success_response_content;
|
||||||
|
|||||||
Reference in New Issue
Block a user