Split server in smaller modules

This commit is contained in:
Yves
2025-02-21 12:46:41 +01:00
parent ae104f697a
commit f1cc3c667d
10 changed files with 339 additions and 245 deletions

View File

@@ -4,72 +4,18 @@
#include "state.hpp"
#include "utils/encoding.hpp"
#include "utils/env.hpp"
#include "event_dispatcher.hpp"
#include "utils/md_helpers.hpp"
#include "utils/serialization.hpp"
#include "watcher.hpp"
#include <duckdb/common/serializer/binary_serializer.hpp>
#include <duckdb/common/serializer/memory_stream.hpp>
#include <duckdb/main/attached_database.hpp>
#include <duckdb/parser/parser.hpp>
// Chosen to be no more than half of the lesser of the two limits:
// - The default httplib thread pool size = 8
// - The browser limit on the number of server-sent event connections = 6
#define MAX_EVENT_WAIT_COUNT 3
namespace duckdb {
namespace ui {
// An empty Server-Sent Events message. See
// https://html.spec.whatwg.org/multipage/server-sent-events.html#authoring-notes
constexpr const char *EMPTY_SSE_MESSAGE = ":\r\r";
constexpr idx_t EMPTY_SSE_MESSAGE_LENGTH = 3;
bool EventDispatcher::WaitEvent(httplib::DataSink *sink) {
std::unique_lock<std::mutex> lock(mutex);
// Don't allow too many simultaneous waits, because each consumes a thread in
// the httplib thread pool, and also browsers limit the number of server-sent
// event connections.
if (closed || wait_count >= MAX_EVENT_WAIT_COUNT) {
return false;
}
int target_id = next_id;
wait_count++;
cv.wait_for(lock, std::chrono::seconds(5));
wait_count--;
if (closed) {
return false;
}
if (current_id == target_id) {
sink->write(message.data(), message.size());
} else {
// Our wait timer expired. Write an empty, no-op message.
// This enables detecting when the client is gone.
sink->write(EMPTY_SSE_MESSAGE, EMPTY_SSE_MESSAGE_LENGTH);
}
return true;
}
void EventDispatcher::SendEvent(const std::string &_message) {
std::lock_guard<std::mutex> guard(mutex);
if (closed) {
return;
}
current_id = next_id++;
message = _message;
cv.notify_all();
}
void EventDispatcher::Close() {
std::lock_guard<std::mutex> guard(mutex);
if (closed) {
return;
}
current_id = next_id++;
closed = true;
cv.notify_all();
}
unique_ptr<HttpServer> HttpServer::server_instance;
HttpServer *HttpServer::GetInstance(ClientContext &context) {
@@ -92,11 +38,11 @@ void HttpServer::UpdateDatabaseInstanceIfRunning(
void HttpServer::UpdateDatabaseInstance(
shared_ptr<DatabaseInstance> context_db) {
const auto current_db = server_instance->ddb_instance.lock();
const auto current_db = server_instance->LockDatabaseInstance();
if (current_db != context_db) {
server_instance->StopWatcher(); // Likely already stopped, but just in case
server_instance->watcher->Stop();
server_instance->ddb_instance = context_db;
server_instance->StartWatcher();
server_instance->watcher->Start();
}
}
@@ -105,7 +51,7 @@ bool HttpServer::Started() {
}
void HttpServer::StopInstance() {
if (server_instance) {
if (Started()) {
server_instance->DoStop();
}
}
@@ -117,9 +63,11 @@ const HttpServer &HttpServer::Start(ClientContext &context, bool *was_started) {
}
return *GetInstance(context);
}
if (was_started) {
*was_started = false;
}
const auto remote_url = GetRemoteUrl(context);
const auto port = GetLocalPort(context);
auto server = GetInstance(context);
@@ -143,51 +91,36 @@ void HttpServer::DoStart(const uint16_t _local_port,
UI_EXTENSION_GIT_SHA, DuckDB::Platform());
event_dispatcher = make_uniq<EventDispatcher>();
main_thread = make_uniq<std::thread>(&HttpServer::Run, this);
StartWatcher();
}
void HttpServer::StartWatcher() {
{
std::lock_guard<std::mutex> guard(watcher_mutex);
watcher_should_run = true;
}
if (!watcher_thread) {
watcher_thread = make_uniq<std::thread>(&HttpServer::Watch, this);
}
}
void HttpServer::StopWatcher() {
if (!watcher_thread) {
return;
}
{
std::lock_guard<std::mutex> guard(watcher_mutex);
watcher_should_run = false;
}
watcher_cv.notify_all();
watcher_thread->join();
watcher_thread.reset();
watcher = make_uniq<Watcher>(*this);
watcher->Start();
}
bool HttpServer::Stop() {
if (!Started()) {
return false;
}
server_instance->DoStop();
return true;
}
void HttpServer::DoStop() {
event_dispatcher->Close();
if (event_dispatcher) {
event_dispatcher->Close();
event_dispatcher = nullptr;
}
server.stop();
StopWatcher();
if (watcher) {
watcher->Stop();
watcher = nullptr;
}
if (main_thread) {
main_thread->join();
main_thread.reset();
}
main_thread->join();
main_thread.reset();
event_dispatcher.reset();
ddb_instance.reset();
remote_url = "";
local_port = 0;
@@ -197,124 +130,8 @@ std::string HttpServer::LocalUrl() const {
return StringUtil::Format("http://localhost:%d/", local_port);
}
void HttpServer::SendConnectedEvent(const std::string &token) {
SendEvent(StringUtil::Format("event: ConnectedEvent\ndata: %s\n\n", token));
}
void HttpServer::SendCatalogChangedEvent() {
SendEvent("event: CatalogChangeEvent\ndata:\n\n");
}
void HttpServer::SendEvent(const std::string &message) {
if (event_dispatcher) {
event_dispatcher->SendEvent(message);
}
}
bool WasCatalogUpdated(DatabaseInstance &db, Connection &connection,
CatalogState &last_state) {
bool has_change = false;
auto &context = *connection.context;
connection.BeginTransaction();
const auto &databases = db.GetDatabaseManager().GetDatabases(context);
std::set<idx_t> db_oids;
// Check currently attached databases
for (const auto &db_ref : databases) {
auto &db = db_ref.get();
if (db.IsTemporary()) {
continue; // ignore temp databases
}
db_oids.insert(db.oid);
auto &catalog = db.GetCatalog();
auto current_version = catalog.GetCatalogVersion(context);
auto last_version_it = last_state.db_to_catalog_version.find(db.oid);
if (last_version_it == last_state.db_to_catalog_version.end() // first time
|| !(last_version_it->second == current_version)) { // updated
has_change = true;
last_state.db_to_catalog_version[db.oid] = current_version;
}
}
// Now check if any databases have been detached
for (auto it = last_state.db_to_catalog_version.begin();
it != last_state.db_to_catalog_version.end();) {
if (db_oids.find(it->first) == db_oids.end()) {
has_change = true;
it = last_state.db_to_catalog_version.erase(it);
} else {
++it;
}
}
connection.Rollback();
return has_change;
}
std::string GetMDToken(Connection &connection) {
auto query_res = connection.Query("CALL GET_MD_TOKEN()");
if (query_res->HasError()) {
query_res->ThrowError();
return ""; // unreachable
}
auto chunk = query_res->Fetch();
return chunk->GetValue(0, 0).GetValue<std::string>();
}
bool IsMDConnected(Connection &con) {
if (!con.context->db->ExtensionIsLoaded("motherduck")) {
return false;
}
auto query_res = con.Query("CALL MD_IS_CONNECTED()");
if (query_res->HasError()) {
std::cerr << "Error in IsMDConnected: " << query_res->GetError()
<< std::endl;
return false;
}
auto chunk = query_res->Fetch();
return chunk->GetValue(0, 0).GetValue<bool>();
}
void HttpServer::Watch() {
CatalogState last_state;
bool is_md_connected = false;
while (watcher_should_run) {
auto db = ddb_instance.lock();
if (!db) {
break; // DB went away, nothing to watch
}
duckdb::Connection con{*db};
auto polling_interval = GetPollingInterval(*con.context);
if (polling_interval == 0) {
return; // Disable watcher
}
try {
if (WasCatalogUpdated(*db, con, last_state)) {
SendCatalogChangedEvent();
}
if (!is_md_connected && IsMDConnected(con)) {
is_md_connected = true;
SendConnectedEvent(GetMDToken(con));
}
} catch (std::exception &ex) {
// Do not crash with uncaught exception, but quit.
std::cerr << "Error in watcher: " << ex.what() << std::endl;
std::cerr << "Will now terminate." << std::endl;
return;
}
{
std::unique_lock<std::mutex> lock(watcher_mutex);
watcher_cv.wait_for(lock, std::chrono::milliseconds(polling_interval));
}
}
shared_ptr<DatabaseInstance> HttpServer::LockDatabaseInstance() {
return ddb_instance.lock();
}
void HttpServer::Run() {