diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 7358dec9285..5634a4998d4 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -338,7 +338,7 @@ std::vector Client::loadWarningMessages() return {}; std::vector messages; - connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings SETTINGS _tp_internal_system_open_sesame=true", + connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings SETTINGS _tp_internal_system_open_sesame=true,is_internal=true", {} /* query_parameters */, "" /* query_id */, QueryProcessingStage::Complete, diff --git a/programs/server/CMakeLists.txt b/programs/server/CMakeLists.txt index b927879dfe7..80cd394241f 100644 --- a/programs/server/CMakeLists.txt +++ b/programs/server/CMakeLists.txt @@ -2,7 +2,6 @@ include(${proton_SOURCE_DIR}/cmake/embed_binary.cmake) set(CLICKHOUSE_SERVER_SOURCES MetricsTransmitter.cpp - TelemetryCollector.cpp Server.cpp ) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index af24e6dbfa5..987e35a3ecc 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -87,11 +87,11 @@ #include "config_version.h" /// proton: starts -#include "TelemetryCollector.h" #include #include #include #include +#include #include #include #include @@ -307,6 +307,9 @@ void initGlobalServices(DB::ContextMutablePtr & global_context) if (native_log.enabled() && pool.enabled()) throw DB::Exception("Both external Kafka log and internal native log are enabled. This is not a supported configuration", DB::ErrorCodes::UNSUPPORTED); + + auto & telemetry_collector = DB::TelemetryCollector::instance(global_context); + telemetry_collector.startup(); } void initGlobalSingletons(DB::ContextMutablePtr & context) @@ -317,7 +320,6 @@ void initGlobalSingletons(DB::ContextMutablePtr & context) DB::DiskUtilChecker::instance(context); DB::ExternalGrokPatterns::instance(context); DB::ExternalUserDefinedFunctionsLoader::instance(context); - DB::TelemetryCollector::instance(context); } void deinitGlobalSingletons(DB::ContextMutablePtr & context) @@ -1759,6 +1761,8 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "Closed connections."); /// proton: start. + DB::TelemetryCollector::instance(global_context).shutdown(); + deinitGlobalSingletons(global_context); disposeV8(); diff --git a/programs/server/config.yaml b/programs/server/config.yaml index e4001882dfb..ab9cf8bc33a 100644 --- a/programs/server/config.yaml +++ b/programs/server/config.yaml @@ -150,6 +150,8 @@ telemetry_enabled: "@replace": true "#text": true +telemetry_interval_ms: 300000 # 5 minutes + # gRPC protocol (see src/Server/grpc_protos/proton_grpc.proto for the API) # grpc_port: 9100 grpc: diff --git a/src/Client/Suggest.cpp b/src/Client/Suggest.cpp index 032806763b1..94f43b2d9bc 100644 --- a/src/Client/Suggest.cpp +++ b/src/Client/Suggest.cpp @@ -90,9 +90,9 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti query << "SELECT DISTINCT name FROM system.dictionaries LIMIT " << limit_str << " UNION ALL "; } - query << "SELECT DISTINCT name FROM system.columns LIMIT " << limit_str << " SETTINGS _tp_internal_system_open_sesame=true"; + query << "SELECT DISTINCT name FROM system.columns LIMIT " << limit_str << " SETTINGS _tp_internal_system_open_sesame=true,is_internal=true"; } - query << ") WHERE not_empty(res) SETTINGS _tp_internal_system_open_sesame=true"; + query << ") WHERE not_empty(res) SETTINGS _tp_internal_system_open_sesame=true,is_internal=true"; return query.str(); } diff --git a/src/Core/Settings.h b/src/Core/Settings.h index e2fb8e4b7c6..8b1a5d1eda0 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -844,6 +844,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Int64, async_ingest_block_timeout_ms, 120000, "Max duration for a block to commit before it is considered expired during async ingestion", 0) \ M(UInt64, aysnc_ingest_max_outstanding_blocks, 10000, "Max outstanding blocks to be committed during async ingestion", 0) \ M(Bool, _tp_internal_system_open_sesame, true, "Control the access to system.* streams", 0) \ + M(Bool, is_internal, false, "Control the statistics of select query", 0) \ M(UInt64, javascript_max_memory_bytes, 100 * 1024 * 1024, "Maximum heap size of javascript UDA/UDF in bytes", 0) \ M(Bool, enable_dependency_check, true, "Enable the dependency check of view/materialized view", 0) \ M(RecoveryPolicy, recovery_policy, RecoveryPolicy::Strict, "Default recovery policy for materialized view when inner query failed. 'strict': always recover from checkpointed; 'best_effort': attempts to recover from checkpointed and allow skipping of some data with permanent errors;", 0) \ diff --git a/src/Interpreters/InterpreterFactory.cpp b/src/Interpreters/InterpreterFactory.cpp index 4e7e6e72892..e41676dbfda 100644 --- a/src/Interpreters/InterpreterFactory.cpp +++ b/src/Interpreters/InterpreterFactory.cpp @@ -134,8 +134,13 @@ std::unique_ptr InterpreterFactory::get(ASTPtr & query, ContextMut else if (query->as()) { auto interpreter = std::make_unique(query, context, options); - ProfileEvents::increment(ProfileEvents::SelectQuery); - ProfileEvents::increment(interpreter->isStreamingQuery() ? ProfileEvents::StreamingSelectQuery : ProfileEvents::HistoricalSelectQuery); + + if(!options.is_internal && !context->getSettingsRef().is_internal) + { + ProfileEvents::increment(ProfileEvents::SelectQuery); + ProfileEvents::increment(interpreter->isStreamingQuery() ? ProfileEvents::StreamingSelectQuery : ProfileEvents::HistoricalSelectQuery); + } + return std::move(interpreter); } else if (query->as()) diff --git a/programs/server/TelemetryCollector.cpp b/src/Interpreters/TelemetryCollector.cpp similarity index 60% rename from programs/server/TelemetryCollector.cpp rename to src/Interpreters/TelemetryCollector.cpp index 6d4ec4c50b3..dddb78bed27 100644 --- a/programs/server/TelemetryCollector.cpp +++ b/src/Interpreters/TelemetryCollector.cpp @@ -1,78 +1,97 @@ #include "TelemetryCollector.h" #include "config_version.h" -#include +#include +#include +#include +#include +#include +#include +#include #include #include #include -#include -#include +#include #include #include -#include -#include -#include -#include -#include namespace fs = std::filesystem; namespace ProfileEvents { - extern const Event SelectQuery; - extern const Event StreamingSelectQuery; - extern const Event HistoricalSelectQuery; +extern const Event SelectQuery; +extern const Event StreamingSelectQuery; +extern const Event HistoricalSelectQuery; } namespace DB { +namespace +{ +constexpr auto DEFAULT_INTERVAL_MS = 5 * 60 * 1000; +} TelemetryCollector::TelemetryCollector(ContextPtr context_) - : log(&Poco::Logger::get("TelemetryCollector")), - pool(context_->getSchedulePool()), - started_on_in_minutes(UTCMinutes::now()) + : log(&Poco::Logger::get("TelemetryCollector")), pool(context_->getSchedulePool()), started_on_in_minutes(UTCMinutes::now()) { const auto & config = context_->getConfigRef(); - if (!config.getBool("telemetry_enabled", true)) - { - LOG_WARNING(log, "Please note that telemetry is disabled."); - is_shutdown.test_and_set(); - return; - } + is_enable = config.getBool("telemetry_enabled", true); + + collect_interval_ms = config.getUInt("telemetry_interval_ms", DEFAULT_INTERVAL_MS); WriteBufferFromOwnString wb; writeDateTimeTextISO(UTCMilliseconds::now(), 3, wb, DateLUT::instance("UTC")); started_on = wb.str(); - - LOG_WARNING(log, "Please note that telemetry is enabled. " - "This is used to collect the version and runtime environment information to Timeplus, Inc. " - "You can disable it by setting telemetry_enabled to false in config.yaml"); - - collector_task = pool.createTask("TelemetryCollector", [this]() { this->collect(); }); - collector_task->activate(); - collector_task->schedule(); } TelemetryCollector::~TelemetryCollector() { shutdown(); + LOG_INFO(log, "stopped"); +} + +void TelemetryCollector::startup() +{ + collector_task = pool.createTask("TelemetryCollector", [this]() { this->collect(); }); + collector_task->activate(); + collector_task->schedule(); } void TelemetryCollector::shutdown() { - if (!is_shutdown.test_and_set()) + if (is_shutdown.test_and_set()) + return; + + if (collector_task) { LOG_INFO(log, "Stopped"); collector_task->deactivate(); } } +void TelemetryCollector::enable() +{ + LOG_WARNING( + log, + "Please note that telemetry is enabled. " + "This is used to collect the version and runtime environment information to Timeplus, Inc. " + "You can disable it by setting telemetry_enabled to false in config.yaml"); + is_enable = true; +} + +void TelemetryCollector::disable() +{ + LOG_WARNING(log, "Please note that telemetry is disabled."); + is_enable = false; +} + void TelemetryCollector::collect() { - SCOPE_EXIT({ - collector_task->scheduleAfter(INTERVAL_MS); - }); + SCOPE_EXIT({ collector_task->scheduleAfter(getCollectIntervalMilliseconds()); }); + + if (!isEnabled()) + return; constexpr auto jitsu_url = "https://data.timeplus.com/api/s/s2s/track"; constexpr auto jitsu_token = "U7qmIGzuZvvkp16iPaYLeBR4IHfKBY6P:Cc6EUDRmEHG9TCO7DX8x23xWrdFg8pBU"; @@ -94,16 +113,26 @@ void TelemetryCollector::collect() /// https://stackoverflow.com/questions/20010199/how-to-determine-if-a-process-runs-inside-lxc-docker bool in_docker = fs::exists("/.dockerenv"); - auto load_counter = [](const auto & event){ - assert (event < ProfileEvents::end()); - return ProfileEvents::global_counters[event].load(std::memory_order_relaxed); + auto load_counter = [](const auto & event) { + assert(event < ProfileEvents::end()); + return static_cast(ProfileEvents::global_counters[event].load(std::memory_order_relaxed)); }; const auto total_select_query = load_counter(ProfileEvents::SelectQuery); const auto streaming_select_query = load_counter(ProfileEvents::StreamingSelectQuery); const auto historical_select_query = load_counter(ProfileEvents::HistoricalSelectQuery); - std::string data = fmt::format("{{" + const auto delta_total_select_query = total_select_query - prev_total_select_query; + prev_total_select_query = total_select_query; + + const auto delta_streaming_select_query = streaming_select_query - prev_streaming_select_query; + prev_streaming_select_query = streaming_select_query; + + const auto delta_historical_select_query = historical_select_query - prev_historical_select_query; + prev_historical_select_query = historical_select_query; + + std::string data = fmt::format( + "{{" "\"type\": \"track\"," "\"event\": \"proton_ping\"," "\"properties\": {{" @@ -118,14 +147,30 @@ void TelemetryCollector::collect() " \"docker\": \"{}\"," " \"total_select_query\": \"{}\"," " \"historical_select_query\": \"{}\"," - " \"streaming_select_query\": \"{}\"" + " \"streaming_select_query\": \"{}\"," + " \"delta_total_select_query\": \"{}\"," + " \"delta_historical_select_query\": \"{}\"," + " \"delta_streaming_select_query\": \"{}\"" "}}" - "}}", cpu, memory_in_gb, EDITION, VERSION_STRING, new_session, started_on, duration_in_minute, server_uuid_str, in_docker, total_select_query, historical_select_query, streaming_select_query); + "}}", + cpu, + memory_in_gb, + EDITION, + VERSION_STRING, + new_session, + started_on, + duration_in_minute, + server_uuid_str, + in_docker, + total_select_query, + historical_select_query, + streaming_select_query, + delta_total_select_query, + delta_historical_select_query, + delta_streaming_select_query); LOG_TRACE(log, "Sending telemetry: {}.", data); - new_session = false; - request.setContentLength(data.length()); request.setContentType("application/json"); request.add("X-Write-Key", jitsu_token); @@ -145,6 +190,7 @@ void TelemetryCollector::collect() return; } + new_session = false; LOG_INFO(log, "Telemetry sent successfully."); } catch (Poco::Exception & ex) diff --git a/programs/server/TelemetryCollector.h b/src/Interpreters/TelemetryCollector.h similarity index 51% rename from programs/server/TelemetryCollector.h rename to src/Interpreters/TelemetryCollector.h index 604dfd132a6..87a30f0cbd8 100644 --- a/programs/server/TelemetryCollector.h +++ b/src/Interpreters/TelemetryCollector.h @@ -14,12 +14,18 @@ class TelemetryCollector BackgroundSchedulePool & pool; BackgroundSchedulePoolTaskHolder collector_task; std::atomic_flag is_shutdown; + std::atomic_bool is_enable; std::string started_on; bool new_session = true; Int64 started_on_in_minutes; + std::atomic collect_interval_ms; - static constexpr auto INTERVAL_MS = 5 * 60 * 1000; /// sending anonymous telemetry data every 5 minutes + Int64 prev_total_select_query = 0; + Int64 prev_streaming_select_query = 0; + Int64 prev_historical_select_query = 0; + + // static constexpr auto INTERVAL_MS = 5 * 60 * 1000; /// sending anonymous telemetry data every 5 minutes public: static TelemetryCollector & instance(ContextPtr context_) @@ -29,8 +35,19 @@ class TelemetryCollector } ~TelemetryCollector(); + + void startup(); void shutdown(); + void enable(); + void disable(); + + bool isEnabled() const { return is_enable; } + + UInt64 getCollectIntervalMilliseconds() const { return collect_interval_ms.load(); } + + void setCollectIntervalMilliseconds(UInt64 interval_ms) { collect_interval_ms.store(interval_ms); } + private: void collect(); TelemetryCollector(ContextPtr context_); diff --git a/src/Server/RestRouterHandlers/RestRouterFactory.h b/src/Server/RestRouterHandlers/RestRouterFactory.h index 8ecbed4cad8..b1643d3fce2 100644 --- a/src/Server/RestRouterHandlers/RestRouterFactory.h +++ b/src/Server/RestRouterHandlers/RestRouterFactory.h @@ -19,6 +19,7 @@ #include "StorageInfoHandler.h" #include "SystemCommandHandler.h" #include "TabularTableRestRouterHandler.h" +#include "TelemetryHandler.h" #include "UDFHandler.h" #include @@ -216,6 +217,14 @@ class RestRouterFactory final return std::make_shared(query_context); }); + /// GET/POST: /proton/v1/telemetry + factory.registerRouterHandler( + fmt::format("/{}/v1/telemetry", prefix), + "GET/POST", + [](ContextMutablePtr query_context) { /// STYLE_CHECK_ALLOW_BRACE_SAME_LINE_LAMBDA + return std::make_shared(query_context); + }); + factory.registerRouterHandler( fmt::format("/{}/apis", prefix), "GET", diff --git a/src/Server/RestRouterHandlers/TelemetryHandler.cpp b/src/Server/RestRouterHandlers/TelemetryHandler.cpp new file mode 100644 index 00000000000..af1cdd6a47f --- /dev/null +++ b/src/Server/RestRouterHandlers/TelemetryHandler.cpp @@ -0,0 +1,61 @@ +#include "TelemetryHandler.h" +#include "SchemaValidator.h" + +#include + + +namespace DB +{ +namespace ErrorCodes +{ +extern const int BAD_REQUEST_PARAMETER; +extern const int UDF_INVALID_NAME; +} + +std::pair TelemetryHandler::executeGet(const Poco::JSON::Object::Ptr & /*payload*/) const +{ + Poco::JSON::Object resp; + resp.set("request_id", query_context->getCurrentQueryId()); + + auto & telemetry_collector = TelemetryCollector::instance(query_context); + + resp.set("collect", telemetry_collector.isEnabled()); + resp.set("collect_interval_ms", telemetry_collector.getCollectIntervalMilliseconds()); + + std::stringstream resp_str_stream; /// STYLE_CHECK_ALLOW_STD_STRING_STREAM + resp.stringify(resp_str_stream, 0); + return {resp_str_stream.str(), HTTPResponse::HTTP_OK}; +} + +std::map> TelemetryHandler::update_schema + = {{"required", {{"collect", "bool"}}}, {"optional", {{"collect_interval_ms", "int"}}}}; + +bool TelemetryHandler::validatePost(const Poco::JSON::Object::Ptr & payload, String & error_msg) const +{ + return validateSchema(update_schema, payload, error_msg); +} + +std::pair TelemetryHandler::executePost(const Poco::JSON::Object::Ptr & payload) const +{ + auto & telemetry_collector = TelemetryCollector::instance(query_context); + + bool collect = payload->get("collect").convert(); + + if (collect) + telemetry_collector.enable(); + else + telemetry_collector.disable(); + + if (payload->has("collect_interval_ms")) + { + UInt64 collect_interval_ms = payload->get("collect_interval_ms").convert(); + telemetry_collector.setCollectIntervalMilliseconds(collect_interval_ms); + } + + Poco::JSON::Object resp; + resp.set("request_id", query_context->getCurrentQueryId()); + std::stringstream resp_str_stream; /// STYLE_CHECK_ALLOW_STD_STRING_STREAM + resp.stringify(resp_str_stream, 0); + return {resp_str_stream.str(), HTTPResponse::HTTP_OK}; +} +} diff --git a/src/Server/RestRouterHandlers/TelemetryHandler.h b/src/Server/RestRouterHandlers/TelemetryHandler.h new file mode 100644 index 00000000000..b1d4a36c2ef --- /dev/null +++ b/src/Server/RestRouterHandlers/TelemetryHandler.h @@ -0,0 +1,27 @@ +#pragma once + +#include "RestRouterHandler.h" + +#include +#include "config.h" + + +namespace DB +{ + +class TelemetryHandler final : public RestRouterHandler +{ +public: + explicit TelemetryHandler(ContextMutablePtr query_context_) : RestRouterHandler(query_context_, "TelemetryHandler") { } + ~TelemetryHandler() override = default; + +protected: + static std::map> update_schema; + bool validatePost(const Poco::JSON::Object::Ptr & payload, String & error_msg) const override; + +private: + std::pair executeGet(const Poco::JSON::Object::Ptr & payload) const override; + std::pair executePost(const Poco::JSON::Object::Ptr & payload) const override; +}; + +}