Watch catalog updates

This commit is contained in:
Yves
2025-02-19 12:30:10 +01:00
parent eeffbbd607
commit d53c197f58
3 changed files with 87 additions and 1 deletions

View File

@@ -2,6 +2,7 @@
#include <duckdb/common/serializer/memory_stream.hpp> #include <duckdb/common/serializer/memory_stream.hpp>
#include <duckdb/common/serializer/binary_serializer.hpp> #include <duckdb/common/serializer/binary_serializer.hpp>
#include <duckdb/main/attached_database.hpp>
#include <duckdb/parser/parser.hpp> #include <duckdb/parser/parser.hpp>
#include "utils/env.hpp" #include "utils/env.hpp"
#include "utils/serialization.hpp" #include "utils/serialization.hpp"
@@ -104,6 +105,11 @@ bool HttpServer::Start(const uint16_t _local_port,
DuckDB::Platform()); DuckDB::Platform());
event_dispatcher = make_uniq<EventDispatcher>(); event_dispatcher = make_uniq<EventDispatcher>();
main_thread = make_uniq<std::thread>(&HttpServer::Run, this); main_thread = make_uniq<std::thread>(&HttpServer::Run, this);
{
std::lock_guard<std::mutex> guard(watcher_mutex);
watcher_should_run = true;
}
watcher_thread = make_uniq<std::thread>(&HttpServer::Watch, this);
return true; return true;
} }
@@ -114,6 +120,17 @@ bool HttpServer::Stop() {
event_dispatcher->Close(); event_dispatcher->Close();
server.stop(); server.stop();
if (watcher_thread) {
{
std::lock_guard<std::mutex> guard(watcher_mutex);
watcher_should_run = false;
}
watcher_cv.notify_all();
watcher_thread->join();
watcher_thread.reset();
}
main_thread->join(); main_thread->join();
main_thread.reset(); main_thread.reset();
event_dispatcher.reset(); event_dispatcher.reset();
@@ -140,6 +157,62 @@ void HttpServer::SendEvent(const std::string &message) {
} }
} }
void HttpServer::WatchForCatalogUpdate(CatalogState &last_state) {
bool has_change = false;
duckdb::Connection con{*ddb_instance};
auto &context = *con.context;
con.BeginTransaction();
const auto &databases =
ddb_instance->GetDatabaseManager().GetDatabases(context);
std::set<idx_t> db_oids;
// Check currently attached databases
for (const auto &db_ref : databases) {
auto &db = db_ref.get();
if (db.IsTemporary()) {
continue; // ignore temp databases
}
db_oids.insert(db.oid);
auto &catalog = db.GetCatalog();
auto current_version = catalog.GetCatalogVersion(context);
auto last_version_it = last_state.db_to_catalog_version.find(db.oid);
if (last_version_it == last_state.db_to_catalog_version.end() // first time
|| !(last_version_it->second == current_version)) { // updated
has_change = true;
last_state.db_to_catalog_version[db.oid] = current_version;
}
}
// Now check if any databases have been detached
for (auto it = last_state.db_to_catalog_version.begin();
it != last_state.db_to_catalog_version.end();) {
if (db_oids.find(it->first) == db_oids.end()) {
has_change = true;
it = last_state.db_to_catalog_version.erase(it);
} else {
++it;
}
}
// If there was any change, notify the UI
if (has_change) {
SendCatalogChangedEvent();
}
}
void HttpServer::Watch() {
CatalogState last_state;
while (watcher_should_run) {
WatchForCatalogUpdate(last_state);
{
std::unique_lock<std::mutex> lock(watcher_mutex);
watcher_cv.wait_for(lock,
std::chrono::milliseconds(2000)); // TODO - configure
}
}
}
void HttpServer::Run() { void HttpServer::Run() {
server.Get("/localEvents", server.Get("/localEvents",
[&](const httplib::Request &req, httplib::Response &res) { [&](const httplib::Request &req, httplib::Response &res) {

View File

@@ -18,6 +18,10 @@ class MemoryStream;
namespace ui { namespace ui {
struct CatalogState {
std::map<idx_t, optional_idx> db_to_catalog_version;
};
class EventDispatcher { class EventDispatcher {
public: public:
bool WaitEvent(httplib::DataSink *sink); bool WaitEvent(httplib::DataSink *sink);
@@ -51,6 +55,7 @@ public:
private: private:
void SendEvent(const std::string &message); void SendEvent(const std::string &message);
void Run(); void Run();
void Watch();
void HandleGetLocalEvents(const httplib::Request &req, void HandleGetLocalEvents(const httplib::Request &req,
httplib::Response &res); httplib::Response &res);
void HandleGetLocalToken(const httplib::Request &req, httplib::Response &res); void HandleGetLocalToken(const httplib::Request &req, httplib::Response &res);
@@ -70,12 +75,20 @@ private:
void SetResponseEmptyResult(httplib::Response &res); void SetResponseEmptyResult(httplib::Response &res);
void SetResponseErrorResult(httplib::Response &res, const std::string &error); void SetResponseErrorResult(httplib::Response &res, const std::string &error);
// Watchers
void WatchForCatalogUpdate(CatalogState &last_state);
uint16_t local_port; uint16_t local_port;
std::string remote_url; std::string remote_url;
shared_ptr<DatabaseInstance> ddb_instance; shared_ptr<DatabaseInstance> ddb_instance;
std::string user_agent; std::string user_agent;
httplib::Server server; httplib::Server server;
unique_ptr<std::thread> main_thread; unique_ptr<std::thread> main_thread;
unique_ptr<std::thread> watcher_thread;
std::mutex watcher_mutex;
std::condition_variable watcher_cv;
std::atomic<bool> watcher_should_run;
std::mutex connections_mutex; std::mutex connections_mutex;
std::unordered_map<std::string, shared_ptr<Connection>> connections; std::unordered_map<std::string, shared_ptr<Connection>> connections;
unique_ptr<EventDispatcher> event_dispatcher; unique_ptr<EventDispatcher> event_dispatcher;

View File

@@ -9,7 +9,7 @@ namespace duckdb {
typedef std::string (*simple_tf_t)(ClientContext &); typedef std::string (*simple_tf_t)(ClientContext &);
struct RunOnceTableFunctionState : GlobalTableFunctionState { struct RunOnceTableFunctionState : GlobalTableFunctionState {
RunOnceTableFunctionState() : run(false){}; RunOnceTableFunctionState() : run(false) {};
std::atomic<bool> run; std::atomic<bool> run;
static unique_ptr<GlobalTableFunctionState> Init(ClientContext &, static unique_ptr<GlobalTableFunctionState> Init(ClientContext &,