Skip to content

Commit

Permalink
Feature/issue 315 make telemetry interval configurable (#828)
Browse files Browse the repository at this point in the history
* Ignore internal query in statistics of select query

* - make telemetry interval configurable
- provide restful api to enable/disable telemetry collection
- add delta of query counter

* fix

---------

Co-authored-by: cerebellumking <[email protected]>
Co-authored-by: Qijun Niu <[email protected]>
  • Loading branch information
3 people authored Sep 6, 2024
1 parent 870d7d5 commit 28f6a63
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 49 deletions.
2 changes: 1 addition & 1 deletion programs/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ std::vector<String> Client::loadWarningMessages()
return {};

std::vector<String> messages;
connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings SETTINGS _tp_internal_system_open_sesame=true",
connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings SETTINGS _tp_internal_system_open_sesame=true,is_internal=true",
{} /* query_parameters */,
"" /* query_id */,
QueryProcessingStage::Complete,
Expand Down
1 change: 0 additions & 1 deletion programs/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ include(${proton_SOURCE_DIR}/cmake/embed_binary.cmake)

set(CLICKHOUSE_SERVER_SOURCES
MetricsTransmitter.cpp
TelemetryCollector.cpp
Server.cpp
)

Expand Down
8 changes: 6 additions & 2 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@
#include "config_version.h"

/// proton: starts
#include "TelemetryCollector.h"
#include <Checkpoint/CheckpointCoordinator.h>
#include <DataTypes/DataTypeFactory.h>
#include <Functions/UserDefined/ExternalUserDefinedFunctionsLoader.h>
#include <Interpreters/DiskUtilChecker.h>
#include <Interpreters/TelemetryCollector.h>
#include <KafkaLog/KafkaWALPool.h>
#include <NativeLog/Server/NativeLog.h>
#include <Server/RestRouterHandlers/RestRouterFactory.h>
Expand Down Expand Up @@ -307,6 +307,9 @@ void initGlobalServices(DB::ContextMutablePtr & global_context)

if (native_log.enabled() && pool.enabled())
throw DB::Exception("Both external Kafka log and internal native log are enabled. This is not a supported configuration", DB::ErrorCodes::UNSUPPORTED);

auto & telemetry_collector = DB::TelemetryCollector::instance(global_context);
telemetry_collector.startup();
}

void initGlobalSingletons(DB::ContextMutablePtr & context)
Expand All @@ -317,7 +320,6 @@ void initGlobalSingletons(DB::ContextMutablePtr & context)
DB::DiskUtilChecker::instance(context);
DB::ExternalGrokPatterns::instance(context);
DB::ExternalUserDefinedFunctionsLoader::instance(context);
DB::TelemetryCollector::instance(context);
}

void deinitGlobalSingletons(DB::ContextMutablePtr & context)
Expand Down Expand Up @@ -1759,6 +1761,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Closed connections.");

/// proton: start.
DB::TelemetryCollector::instance(global_context).shutdown();

deinitGlobalSingletons(global_context);

disposeV8();
Expand Down
2 changes: 2 additions & 0 deletions programs/server/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ telemetry_enabled:
"@replace": true
"#text": true

telemetry_interval_ms: 300000 # 5 minutes

# gRPC protocol (see src/Server/grpc_protos/proton_grpc.proto for the API)
# grpc_port: 9100
grpc:
Expand Down
4 changes: 2 additions & 2 deletions src/Client/Suggest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti
query << "SELECT DISTINCT name FROM system.dictionaries LIMIT " << limit_str
<< " UNION ALL ";
}
query << "SELECT DISTINCT name FROM system.columns LIMIT " << limit_str << " SETTINGS _tp_internal_system_open_sesame=true";
query << "SELECT DISTINCT name FROM system.columns LIMIT " << limit_str << " SETTINGS _tp_internal_system_open_sesame=true,is_internal=true";
}
query << ") WHERE not_empty(res) SETTINGS _tp_internal_system_open_sesame=true";
query << ") WHERE not_empty(res) SETTINGS _tp_internal_system_open_sesame=true,is_internal=true";

return query.str();
}
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Int64, async_ingest_block_timeout_ms, 120000, "Max duration for a block to commit before it is considered expired during async ingestion", 0) \
M(UInt64, aysnc_ingest_max_outstanding_blocks, 10000, "Max outstanding blocks to be committed during async ingestion", 0) \
M(Bool, _tp_internal_system_open_sesame, true, "Control the access to system.* streams", 0) \
M(Bool, is_internal, false, "Control the statistics of select query", 0) \
M(UInt64, javascript_max_memory_bytes, 100 * 1024 * 1024, "Maximum heap size of javascript UDA/UDF in bytes", 0) \
M(Bool, enable_dependency_check, true, "Enable the dependency check of view/materialized view", 0) \
M(RecoveryPolicy, recovery_policy, RecoveryPolicy::Strict, "Default recovery policy for materialized view when inner query failed. 'strict': always recover from checkpointed; 'best_effort': attempts to recover from checkpointed and allow skipping of some data with permanent errors;", 0) \
Expand Down
9 changes: 7 additions & 2 deletions src/Interpreters/InterpreterFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,13 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
else if (query->as<ASTSelectWithUnionQuery>())
{
auto interpreter = std::make_unique<InterpreterSelectWithUnionQuery>(query, context, options);
ProfileEvents::increment(ProfileEvents::SelectQuery);
ProfileEvents::increment(interpreter->isStreamingQuery() ? ProfileEvents::StreamingSelectQuery : ProfileEvents::HistoricalSelectQuery);

if(!options.is_internal && !context->getSettingsRef().is_internal)
{
ProfileEvents::increment(ProfileEvents::SelectQuery);
ProfileEvents::increment(interpreter->isStreamingQuery() ? ProfileEvents::StreamingSelectQuery : ProfileEvents::HistoricalSelectQuery);
}

return std::move(interpreter);
}
else if (query->as<ASTSelectIntersectExceptQuery>())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,78 +1,97 @@
#include "TelemetryCollector.h"
#include "config_version.h"

#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
#include <Core/ServerUUID.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <base/ClockUtils.h>
#include <base/getMemoryAmount.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/HTTPSClientSession.h>
#include <base/ClockUtils.h>
#include <base/getMemoryAmount.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/DateLUT.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Core/ServerUUID.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <filesystem>

namespace fs = std::filesystem;

namespace ProfileEvents
{
extern const Event SelectQuery;
extern const Event StreamingSelectQuery;
extern const Event HistoricalSelectQuery;
extern const Event SelectQuery;
extern const Event StreamingSelectQuery;
extern const Event HistoricalSelectQuery;
}

namespace DB
{
namespace
{
constexpr auto DEFAULT_INTERVAL_MS = 5 * 60 * 1000;
}

TelemetryCollector::TelemetryCollector(ContextPtr context_)
: log(&Poco::Logger::get("TelemetryCollector")),
pool(context_->getSchedulePool()),
started_on_in_minutes(UTCMinutes::now())
: log(&Poco::Logger::get("TelemetryCollector")), pool(context_->getSchedulePool()), started_on_in_minutes(UTCMinutes::now())
{
const auto & config = context_->getConfigRef();

if (!config.getBool("telemetry_enabled", true))
{
LOG_WARNING(log, "Please note that telemetry is disabled.");
is_shutdown.test_and_set();
return;
}
is_enable = config.getBool("telemetry_enabled", true);

collect_interval_ms = config.getUInt("telemetry_interval_ms", DEFAULT_INTERVAL_MS);

WriteBufferFromOwnString wb;
writeDateTimeTextISO(UTCMilliseconds::now(), 3, wb, DateLUT::instance("UTC"));
started_on = wb.str();

LOG_WARNING(log, "Please note that telemetry is enabled. "
"This is used to collect the version and runtime environment information to Timeplus, Inc. "
"You can disable it by setting telemetry_enabled to false in config.yaml");

collector_task = pool.createTask("TelemetryCollector", [this]() { this->collect(); });
collector_task->activate();
collector_task->schedule();
}

TelemetryCollector::~TelemetryCollector()
{
shutdown();
LOG_INFO(log, "stopped");
}

void TelemetryCollector::startup()
{
collector_task = pool.createTask("TelemetryCollector", [this]() { this->collect(); });
collector_task->activate();
collector_task->schedule();
}

void TelemetryCollector::shutdown()
{
if (!is_shutdown.test_and_set())
if (is_shutdown.test_and_set())
return;

if (collector_task)
{
LOG_INFO(log, "Stopped");
collector_task->deactivate();
}
}

void TelemetryCollector::enable()
{
LOG_WARNING(
log,
"Please note that telemetry is enabled. "
"This is used to collect the version and runtime environment information to Timeplus, Inc. "
"You can disable it by setting telemetry_enabled to false in config.yaml");
is_enable = true;
}

void TelemetryCollector::disable()
{
LOG_WARNING(log, "Please note that telemetry is disabled.");
is_enable = false;
}

void TelemetryCollector::collect()
{
SCOPE_EXIT({
collector_task->scheduleAfter(INTERVAL_MS);
});
SCOPE_EXIT({ collector_task->scheduleAfter(getCollectIntervalMilliseconds()); });

if (!isEnabled())
return;

constexpr auto jitsu_url = "https://data.timeplus.com/api/s/s2s/track";
constexpr auto jitsu_token = "U7qmIGzuZvvkp16iPaYLeBR4IHfKBY6P:Cc6EUDRmEHG9TCO7DX8x23xWrdFg8pBU";
Expand All @@ -94,16 +113,26 @@ void TelemetryCollector::collect()
/// https://stackoverflow.com/questions/20010199/how-to-determine-if-a-process-runs-inside-lxc-docker
bool in_docker = fs::exists("/.dockerenv");

auto load_counter = [](const auto & event){
assert (event < ProfileEvents::end());
return ProfileEvents::global_counters[event].load(std::memory_order_relaxed);
auto load_counter = [](const auto & event) {
assert(event < ProfileEvents::end());
return static_cast<Int64>(ProfileEvents::global_counters[event].load(std::memory_order_relaxed));
};

const auto total_select_query = load_counter(ProfileEvents::SelectQuery);
const auto streaming_select_query = load_counter(ProfileEvents::StreamingSelectQuery);
const auto historical_select_query = load_counter(ProfileEvents::HistoricalSelectQuery);

std::string data = fmt::format("{{"
const auto delta_total_select_query = total_select_query - prev_total_select_query;
prev_total_select_query = total_select_query;

const auto delta_streaming_select_query = streaming_select_query - prev_streaming_select_query;
prev_streaming_select_query = streaming_select_query;

const auto delta_historical_select_query = historical_select_query - prev_historical_select_query;
prev_historical_select_query = historical_select_query;

std::string data = fmt::format(
"{{"
"\"type\": \"track\","
"\"event\": \"proton_ping\","
"\"properties\": {{"
Expand All @@ -118,14 +147,30 @@ void TelemetryCollector::collect()
" \"docker\": \"{}\","
" \"total_select_query\": \"{}\","
" \"historical_select_query\": \"{}\","
" \"streaming_select_query\": \"{}\""
" \"streaming_select_query\": \"{}\","
" \"delta_total_select_query\": \"{}\","
" \"delta_historical_select_query\": \"{}\","
" \"delta_streaming_select_query\": \"{}\""
"}}"
"}}", cpu, memory_in_gb, EDITION, VERSION_STRING, new_session, started_on, duration_in_minute, server_uuid_str, in_docker, total_select_query, historical_select_query, streaming_select_query);
"}}",
cpu,
memory_in_gb,
EDITION,
VERSION_STRING,
new_session,
started_on,
duration_in_minute,
server_uuid_str,
in_docker,
total_select_query,
historical_select_query,
streaming_select_query,
delta_total_select_query,
delta_historical_select_query,
delta_streaming_select_query);

LOG_TRACE(log, "Sending telemetry: {}.", data);

new_session = false;

request.setContentLength(data.length());
request.setContentType("application/json");
request.add("X-Write-Key", jitsu_token);
Expand All @@ -145,6 +190,7 @@ void TelemetryCollector::collect()
return;
}

new_session = false;
LOG_INFO(log, "Telemetry sent successfully.");
}
catch (Poco::Exception & ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ class TelemetryCollector
BackgroundSchedulePool & pool;
BackgroundSchedulePoolTaskHolder collector_task;
std::atomic_flag is_shutdown;
std::atomic_bool is_enable;

std::string started_on;
bool new_session = true;
Int64 started_on_in_minutes;
std::atomic<UInt64> collect_interval_ms;

static constexpr auto INTERVAL_MS = 5 * 60 * 1000; /// sending anonymous telemetry data every 5 minutes
Int64 prev_total_select_query = 0;
Int64 prev_streaming_select_query = 0;
Int64 prev_historical_select_query = 0;

// static constexpr auto INTERVAL_MS = 5 * 60 * 1000; /// sending anonymous telemetry data every 5 minutes

public:
static TelemetryCollector & instance(ContextPtr context_)
Expand All @@ -29,8 +35,19 @@ class TelemetryCollector
}

~TelemetryCollector();

void startup();
void shutdown();

void enable();
void disable();

bool isEnabled() const { return is_enable; }

UInt64 getCollectIntervalMilliseconds() const { return collect_interval_ms.load(); }

void setCollectIntervalMilliseconds(UInt64 interval_ms) { collect_interval_ms.store(interval_ms); }

private:
void collect();
TelemetryCollector(ContextPtr context_);
Expand Down
9 changes: 9 additions & 0 deletions src/Server/RestRouterHandlers/RestRouterFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "StorageInfoHandler.h"
#include "SystemCommandHandler.h"
#include "TabularTableRestRouterHandler.h"
#include "TelemetryHandler.h"
#include "UDFHandler.h"

#include <re2/re2.h>
Expand Down Expand Up @@ -216,6 +217,14 @@ class RestRouterFactory final
return std::make_shared<DB::SystemCommandHandler>(query_context);
});

/// GET/POST: /proton/v1/telemetry
factory.registerRouterHandler(
fmt::format("/{}/v1/telemetry", prefix),
"GET/POST",
[](ContextMutablePtr query_context) { /// STYLE_CHECK_ALLOW_BRACE_SAME_LINE_LAMBDA
return std::make_shared<DB::TelemetryHandler>(query_context);
});

factory.registerRouterHandler(
fmt::format("/{}/apis", prefix),
"GET",
Expand Down
Loading

0 comments on commit 28f6a63

Please sign in to comment.