diff --git a/programs/bash-completion/completions/proton-bootstrap b/programs/bash-completion/completions/proton-bootstrap index dbf3003d9c5..f0bfcec1328 100644 --- a/programs/bash-completion/completions/proton-bootstrap +++ b/programs/bash-completion/completions/proton-bootstrap @@ -15,6 +15,17 @@ shopt -s extglob export _CLICKHOUSE_COMPLETION_LOADED=1 +CLICKHOUSE_logs_level=( + none + fatal + error + warning + information + debug + trace + test +) + CLICKHOUSE_QueryProcessingStage=( complete fetch_columns @@ -113,6 +124,10 @@ function _complete_for_proton_generic_bin_impl() COMPREPLY=( $(compgen -W "${CLICKHOUSE_QueryProcessingStage[*]}" -- "$cur") ) return 1 ;; + --send_logs_level) + COMPREPLY=( $(compgen -W "${CLICKHOUSE_logs_level[*]}" -- "$cur") ) + return 1 + ;; --format|--input-format|--output-format) COMPREPLY=( $(compgen -W "${CLICKHOUSE_Format[*]}" -- "$cur") ) return 1 diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index f73671ff240..59b38c2f306 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -1147,6 +1147,7 @@ void Client::processConfig() global_context->setCurrentQueryId(query_id); } print_stack_trace = config().getBool("stacktrace", false); + logging_initialized = true; if (config().has("multiquery")) is_multiquery = true; diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index d1d7ad12a39..473aede98a1 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -282,6 +282,12 @@ void LocalServer::tryInitPath() if (path.back() != '/') path += '/'; + + fs::create_directories(fs::path(path) / "user_defined/"); + fs::create_directories(fs::path(path) / "data/"); + fs::create_directories(fs::path(path) / "metadata/"); + fs::create_directories(fs::path(path) / "metadata_dropped/"); + global_context->setPath(path); global_context->setTemporaryStorage(path + "tmp"); @@ -400,13 +406,28 @@ void LocalServer::setupUsers() auto & access_control = global_context->getAccessControl(); access_control.setNoPasswordAllowed(config().getBool("allow_no_password", true)); access_control.setPlaintextPasswordAllowed(config().getBool("allow_plaintext_password", true)); - - if (config().has("users_config") || config().has("config-file") || fs::exists("config.xml")) + if (config().has("config-file") || fs::exists("config.xml")) { - const auto users_config_path = config().getString("users_config", config().getString("config-file", "config.xml")); - ConfigProcessor config_processor(users_config_path); - const auto loaded_config = config_processor.loadConfig(); - users_config = loaded_config.configuration; + String config_path = config().getString("config-file", ""); + bool has_user_directories = config().has("user_directories"); + const auto config_dir = fs::path{config_path}.remove_filename().string(); + String users_config_path = config().getString("users_config", ""); + + if (users_config_path.empty() && has_user_directories) + { + users_config_path = config().getString("user_directories.users_xml.path"); + if (fs::path(users_config_path).is_relative() && fs::exists(fs::path(config_dir) / users_config_path)) + users_config_path = fs::path(config_dir) / users_config_path; + } + + if (users_config_path.empty()) + users_config = getConfigurationFromXMLString(minimal_default_user_xml); + else + { + ConfigProcessor config_processor(users_config_path); + const auto loaded_config = config_processor.loadConfig(); + users_config = loaded_config.configuration; + } } else users_config = getConfigurationFromXMLString(minimal_default_user_xml); @@ -416,11 +437,11 @@ void LocalServer::setupUsers() throw Exception("Can't load config for users", ErrorCodes::CANNOT_LOAD_CONFIG); } - void LocalServer::connect() { connection_parameters = ConnectionParameters(config()); - connection = LocalConnection::createConnection(connection_parameters, global_context, need_render_progress); + connection = LocalConnection::createConnection( + connection_parameters, global_context, need_render_progress, need_render_profile_events, server_display_name); } @@ -523,6 +544,14 @@ catch (...) return getCurrentExceptionCode(); } +void LocalServer::updateLoggerLevel(const String & logs_level) +{ + if (!logging_initialized) + return; + + config().setString("logger.level", logs_level); + updateLevels(config(), logger()); +} void LocalServer::processConfig() { @@ -549,30 +578,31 @@ void LocalServer::processConfig() auto logging = (config().has("logger.console") || config().has("logger.level") || config().has("log-level") + || config().has("send_logs_level") || config().has("logger.log")); - auto file_logging = config().has("server_logs_file"); - if (is_interactive && logging && !file_logging) - throw Exception("For interactive mode logging is allowed only with --server_logs_file option", - ErrorCodes::BAD_ARGUMENTS); + auto level = config().getString("log-level", "trace"); - if (file_logging) + if (config().has("server_logs_file")) { - auto level = Poco::Logger::parseLevel(config().getString("log-level", "trace")); - Poco::Logger::root().setLevel(level); + auto poco_logs_level = Poco::Logger::parseLevel(level); + Poco::Logger::root().setLevel(poco_logs_level); Poco::Logger::root().setChannel(Poco::AutoPtr(new Poco::SimpleFileChannel(server_logs_file))); + logging_initialized = true; } - else if (logging) + else if (logging || is_interactive) { - // force enable logging config().setString("logger", "logger"); - // sensitive data rules are not used here - buildLoggers(config(), logger(), "proton-local"); + auto log_level_default = is_interactive && !logging ? "none" : level; + config().setString("logger.level", config().getString("log-level", config().getString("send_logs_level", log_level_default))); + buildLoggers(config(), logger(), "clickhouse-local"); + logging_initialized = true; } else { Poco::Logger::root().setLevel("none"); Poco::Logger::root().setChannel(Poco::AutoPtr(new Poco::NullChannel())); + logging_initialized = false; } shared_context = Context::createShared(); @@ -663,8 +693,6 @@ void LocalServer::processConfig() status.emplace(fs::path(path) / "status", StatusFile::write_full_info); LOG_DEBUG(log, "Loading metadata from {}", path); - fs::create_directories(fs::path(path) / "data/"); - fs::create_directories(fs::path(path) / "metadata/"); loadMetadataSystem(global_context); attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE)); @@ -802,6 +830,8 @@ void LocalServer::processOptions(const OptionsDescription &, const CommandLineOp config().setString("logger.log", options["logger.log"].as()); if (options.count("logger.level")) config().setString("logger.level", options["logger.level"].as()); + if (options.count("send_logs_level")) + config().setString("send_logs_level", options["send_logs_level"].as()); } } diff --git a/programs/local/LocalServer.h b/programs/local/LocalServer.h index 3149c4fc768..1bd96ef6f06 100644 --- a/programs/local/LocalServer.h +++ b/programs/local/LocalServer.h @@ -44,6 +44,8 @@ class LocalServer : public ClientBase, public Loggers const std::vector &) override; void processConfig() override; + void updateLoggerLevel(const String & logs_level) override; + private: /** Composes CREATE subquery based on passed arguments (--structure --file --table and --input-format) * This query will be executed first, before queries passed through --query argument diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index defc4da73eb..d789a3bf6fe 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -871,7 +871,7 @@ void ClientBase::onProfileEvents(Block & block) if (rows == 0) return; - /// if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) + /// if (getName() == "local" || server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) { const auto & array_thread_id = typeid_cast(*block.getByName("thread_id").column).getData(); const auto & names = typeid_cast(*block.getByName("name").column); @@ -1320,6 +1320,13 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin } } + if (const auto * set_query = parsed_query->as()) + { + const auto * logs_level_field = set_query->changes.tryGet(std::string_view{"send_logs_level"}); + if (logs_level_field) + updateLoggerLevel(logs_level_field->safeGet()); + } + processed_rows = 0; written_first_block = false; progress_indication.resetProgress(); @@ -1518,9 +1525,21 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText( bool ClientBase::processQueryText(const String & text) { - if (exit_strings.end() != exit_strings.find(trim(text, [](char c) { return isWhitespaceASCII(c) || c == ';'; }))) + auto trimmed_input = trim(text, [](char c) { return isWhitespaceASCII(c) || c == ';'; }); + + if (exit_strings.end() != exit_strings.find(trimmed_input)) return false; + if (trimmed_input.starts_with("\\i")) + { + size_t skip_prefix_size = std::strlen("\\i"); + auto file_name = trim( + trimmed_input.substr(skip_prefix_size, trimmed_input.size() - skip_prefix_size), + [](char c) { return isWhitespaceASCII(c); }); + + return processMultiQueryFromFile(file_name); + } + if (!is_multiquery) { assert(!query_fuzzer_runs); @@ -1689,28 +1708,27 @@ void ClientBase::runInteractive() std::cout << "Bye." << std::endl; } +bool ClientBase::processMultiQueryFromFile(const String & file_name) +{ + String queries_from_file; + + ReadBufferFromFile in(file_name); + readStringUntilEOF(queries_from_file, in); + + return executeMultiQuery(queries_from_file); +} void ClientBase::runNonInteractive() { if (!queries_files.empty()) { - auto process_multi_query_from_file = [&](const String & file) - { - String queries_from_file; - - ReadBufferFromFile in(file); - readStringUntilEOF(queries_from_file, in); - - return executeMultiQuery(queries_from_file); - }; - for (const auto & queries_file : queries_files) { for (const auto & interleave_file : interleave_queries_files) - if (!process_multi_query_from_file(interleave_file)) + if (!processMultiQueryFromFile(interleave_file)) return; - if (!process_multi_query_from_file(queries_file)) + if (!processMultiQueryFromFile(queries_file)) return; } diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 10ba0633031..6d0cb18f7cd 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -93,6 +93,7 @@ class ClientBase : public Poco::Util::Application, public IHints<2, ClientBase> std::optional external_description; }; + virtual void updateLoggerLevel(const String &) {} virtual void printHelpMessage(const OptionsDescription & options_description) = 0; virtual void addOptions(OptionsDescription & options_description) = 0; virtual void processOptions(const OptionsDescription & options_description, @@ -140,6 +141,11 @@ class ClientBase : public Poco::Util::Application, public IHints<2, ClientBase> void updateSuggest(const ASTCreateQuery & ast_create); protected: + SharedContextHolder shared_context; + ContextMutablePtr global_context; + + bool processMultiQueryFromFile(const String & file_name); + bool is_interactive = false; /// Use either interactive line editing interface or batch mode. bool is_multiquery = false; bool delayed_interactive = false; @@ -175,9 +181,6 @@ class ClientBase : public Poco::Util::Application, public IHints<2, ClientBase> /// Settings specified via command line args Settings cmd_settings; - SharedContextHolder shared_context; - ContextMutablePtr global_context; - /// thread status should be destructed before shared context because it relies on process list. std::optional thread_status; @@ -211,6 +214,7 @@ class ClientBase : public Poco::Util::Application, public IHints<2, ClientBase> ProgressIndication progress_indication; bool need_render_progress = true; + bool need_render_profile_events = true; bool written_first_block = false; size_t processed_rows = 0; /// How many rows have been read or written. @@ -245,6 +249,8 @@ class ClientBase : public Poco::Util::Application, public IHints<2, ClientBase> QueryProcessingStage::Enum query_processing_stage; bool cancelled = false; + + bool logging_initialized = false; }; } diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index a47d049a7c8..42eac02ff57 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -6,7 +6,8 @@ #include #include #include - +#include +#include namespace DB { @@ -18,10 +19,12 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } -LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_) +LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool send_profile_events_, const String & server_display_name_) : WithContext(context_) , session(getContext(), ClientInfo::Interface::LOCAL) , send_progress(send_progress_) + , send_profile_events(send_profile_events_) + , server_display_name(server_display_name_) { /// Authenticate and create a context to execute queries. session.authenticate("default", "", Poco::Net::SocketAddress{}); @@ -55,6 +58,11 @@ void LocalConnection::updateProgress(const Progress & value) state->progress.incrementPiecewiseAtomically(value); } +void LocalConnection::getProfileEvents(Block & block) +{ + ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, block, last_sent_snapshots); +} + void LocalConnection::sendQuery( const ConnectionTimeouts &, const String & query, @@ -77,18 +85,25 @@ void LocalConnection::sendQuery( query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateProgress(Progress(value)); }); } - CurrentThread::QueryScope query_scope_holder(query_context); + if (!current_database.empty()) + query_context->setCurrentDatabase(current_database); state.reset(); state.emplace(); state->query_id = query_id; state->query = query; + state->query_scope_holder = std::make_unique(query_context); state->stage = QueryProcessingStage::Enum(stage); + state->profile_queue = std::make_shared(std::numeric_limits::max()); + CurrentThread::attachInternalProfileEventsQueue(state->profile_queue); if (send_progress) state->after_send_progress.restart(); + if (send_profile_events) + state->after_send_profile_events.restart(); + next_packet_type.reset(); try @@ -245,6 +260,16 @@ bool LocalConnection::poll(size_t) return true; } + if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay)) + { + Block block; + state->after_send_profile_events.restart(); + next_packet_type = Protocol::Server::ProfileEvents; + getProfileEvents(block); + state->block.emplace(std::move(block)); + return true; + } + try { pollImpl(); @@ -443,9 +468,9 @@ void LocalConnection::getServerVersion( throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } -void LocalConnection::setDefaultDatabase(const String &) +void LocalConnection::setDefaultDatabase(const String & database) { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); + current_database = database; } UInt64 LocalConnection::getServerRevision(const ConnectionTimeouts &) @@ -473,9 +498,14 @@ void LocalConnection::sendMergeTreeReadTaskResponse(const PartitionReadResponse throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } -ServerConnectionPtr LocalConnection::createConnection(const ConnectionParameters &, ContextPtr current_context, bool send_progress) +ServerConnectionPtr LocalConnection::createConnection( + const ConnectionParameters &, + ContextPtr current_context, + bool send_progress, + bool send_profile_events, + const String & server_display_name) { - return std::make_unique(current_context, send_progress); + return std::make_unique(current_context, send_progress, send_profile_events, server_display_name); } diff --git a/src/Client/LocalConnection.h b/src/Client/LocalConnection.h index 5391fba4d59..40cc99088c6 100644 --- a/src/Client/LocalConnection.h +++ b/src/Client/LocalConnection.h @@ -5,6 +5,7 @@ #include #include #include +#include #include @@ -29,6 +30,7 @@ struct LocalQueryState std::unique_ptr executor; std::unique_ptr pushing_executor; std::unique_ptr pushing_async_executor; + InternalProfileEventsQueuePtr profile_queue; std::unique_ptr exception; @@ -50,19 +52,28 @@ struct LocalQueryState Progress progress; /// Time after the last check to stop the request and send the progress. Stopwatch after_send_progress; + Stopwatch after_send_profile_events; + + std::unique_ptr query_scope_holder; }; class LocalConnection : public IServerConnection, WithContext { public: - explicit LocalConnection(ContextPtr context_, bool send_progress_ = false); + explicit LocalConnection( + ContextPtr context_, bool send_progress_ = false, bool send_profile_events_ = false, const String & server_display_name_ = ""); ~LocalConnection() override; IServerConnection::Type getConnectionType() const override { return IServerConnection::Type::LOCAL; } - static ServerConnectionPtr createConnection(const ConnectionParameters & connection_parameters, ContextPtr current_context, bool send_progress = false); + static ServerConnectionPtr createConnection( + const ConnectionParameters & connection_parameters, + ContextPtr current_context, + bool send_progress = false, + bool send_profile_events = false, + const String & server_display_name = ""); void setDefaultDatabase(const String & database) override; @@ -131,17 +142,25 @@ class LocalConnection : public IServerConnection, WithContext void updateProgress(const Progress & value); + void getProfileEvents(Block & block); + bool pollImpl(); ContextMutablePtr query_context; Session session; bool send_progress; + bool send_profile_events; + String server_display_name; String description = "proton-local"; std::optional state; /// Last "server" packet. std::optional next_packet_type; + + String current_database; + + ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots; }; } diff --git a/src/Client/ReplxxLineReader.cpp b/src/Client/ReplxxLineReader.cpp index 5af0c2cacb0..7f6ee1a73b9 100644 --- a/src/Client/ReplxxLineReader.cpp +++ b/src/Client/ReplxxLineReader.cpp @@ -227,6 +227,35 @@ ReplxxLineReader::ReplxxLineReader( rx.bind_key(Replxx::KEY::control('W'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::KILL_TO_WHITESPACE_ON_LEFT, code); }); rx.bind_key(Replxx::KEY::meta('E'), [this](char32_t) { openEditor(); return Replxx::ACTION_RESULT::CONTINUE; }); + + /// readline insert-comment + auto insert_comment_action = [this](char32_t code) + { + replxx::Replxx::State state(rx.get_state()); + const char * line = state.text(); + const char * line_end = line + strlen(line); + + std::string commented_line; + if (std::find(line, line_end, '\n') != line_end) + { + /// If query has multiple lines, multiline comment is used over + /// commenting each line separately for easier uncomment (though + /// with invoking editor it is simpler to uncomment multiple lines) + /// + /// Note, that using multiline comment is OK even with nested + /// comments, since nested comments are supported. + commented_line = fmt::format("/* {} */", state.text()); + } + else + { + // In a simplest case use simple comment. + commented_line = fmt::format("-- {}", state.text()); + } + rx.set_state(replxx::Replxx::State(commented_line.c_str(), commented_line.size())); + + return rx.invoke(Replxx::ACTION::COMMIT_LINE, code); + }; + rx.bind_key(Replxx::KEY::meta('#'), insert_comment_action); } ReplxxLineReader::~ReplxxLineReader() diff --git a/src/Client/Suggest.cpp b/src/Client/Suggest.cpp index bf815ee83c3..32dfa091b8f 100644 --- a/src/Client/Suggest.cpp +++ b/src/Client/Suggest.cpp @@ -7,6 +7,7 @@ #include #include #include "Core/Protocol.h" +#include #include #include #include @@ -114,11 +115,18 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p if (e.code() == ErrorCodes::DEADLOCK_AVOIDED) continue; - std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n"; + /// We should not use std::cerr here, because this method works concurrently with the main thread. + /// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr. + + WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096); + out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n"; + out.next(); } catch (...) { - std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n"; + WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096); + out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n"; + out.next(); } break; diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index febbaba2e19..576ff97cca4 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -148,9 +148,10 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, ssize_t priority, std:: jobs.emplace(std::move(job), priority); ++scheduled_jobs; - new_job_or_shutdown.notify_one(); } + new_job_or_shutdown.notify_one(); + return ReturnType(true); } diff --git a/src/Coordination/MetaStateManager.cpp b/src/Coordination/MetaStateManager.cpp index 86f4ebe4426..43877ab2aa9 100644 --- a/src/Coordination/MetaStateManager.cpp +++ b/src/Coordination/MetaStateManager.cpp @@ -98,6 +98,15 @@ void MetaStateManager::loadLogStore(uint64_t last_commited_index, uint64_t logs_ log_store->init(last_commited_index, logs_to_keep); } +void MetaStateManager::system_exit(const int /* exit_code */) +{ + /// NuRaft itself calls exit() which will call atexit handlers + /// and this may lead to an issues in multi-threaded program. + /// + /// Override this with abort(). + abort(); +} + void MetaStateManager::flushLogStore() { log_store->flush(); diff --git a/src/Coordination/MetaStateManager.h b/src/Coordination/MetaStateManager.h index 6b29473a8ed..a501514eeda 100644 --- a/src/Coordination/MetaStateManager.h +++ b/src/Coordination/MetaStateManager.h @@ -44,7 +44,7 @@ class MetaStateManager : public nuraft::state_mgr nuraft::ptr get_srv_config() const { return my_server_config; } - void system_exit(const int /* exit_code */) override {} + void system_exit(const int exit_code) override; int getPort() const { return my_port; } diff --git a/src/Interpreters/InternalTextLogsQueue.cpp b/src/Interpreters/InternalTextLogsQueue.cpp index 2172a6f4261..3b3c98a995c 100644 --- a/src/Interpreters/InternalTextLogsQueue.cpp +++ b/src/Interpreters/InternalTextLogsQueue.cpp @@ -46,24 +46,25 @@ void InternalTextLogsQueue::pushBlock(Block && log_block) LOG_WARNING(&Poco::Logger::get("InternalTextLogsQueue"), "Log block have different structure"); } -const char * InternalTextLogsQueue::getPriorityName(int priority) +std::string_view InternalTextLogsQueue::getPriorityName(int priority) { - /// See Poco::Message::Priority + using namespace std::literals; - static constexpr const char * const PRIORITIES[] = + /// See Poco::Message::Priority + static constexpr std::array PRIORITIES = { - "Unknown", - "Fatal", - "Critical", - "Error", - "Warning", - "Notice", - "Information", - "Debug", - "Trace" + "Unknown"sv, + "Fatal"sv, + "Critical"sv, + "Error"sv, + "Warning"sv, + "Notice"sv, + "Information"sv, + "Debug"sv, + "Trace"sv, + "Test"sv, }; - - return (priority >= 1 && priority <= 8) ? PRIORITIES[priority] : PRIORITIES[0]; + return (priority >= 1 && priority < static_cast(PRIORITIES.size())) ? PRIORITIES[priority] : PRIORITIES[0]; } } diff --git a/src/Interpreters/InternalTextLogsQueue.h b/src/Interpreters/InternalTextLogsQueue.h index 28841598d30..3a4e9bde6c5 100644 --- a/src/Interpreters/InternalTextLogsQueue.h +++ b/src/Interpreters/InternalTextLogsQueue.h @@ -21,7 +21,7 @@ class InternalTextLogsQueue : public ConcurrentBoundedQueue void pushBlock(Block && log_block); /// Converts priority from Poco::Message::Priority to a string - static const char * getPriorityName(int priority); + static std::string_view getPriorityName(int priority); }; using InternalTextLogsQueuePtr = std::shared_ptr; diff --git a/src/Interpreters/ProfileEventsExt.cpp b/src/Interpreters/ProfileEventsExt.cpp index 472efc109fb..4f2a40e4a9a 100644 --- a/src/Interpreters/ProfileEventsExt.cpp +++ b/src/Interpreters/ProfileEventsExt.cpp @@ -1,5 +1,7 @@ #include "ProfileEventsExt.h" #include +#include +#include #include #include #include @@ -36,7 +38,7 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, if (nonzero_only && 0 == value) continue; - const char * desc = ProfileEvents::getName(event); + const char * desc = getName(event); key_column.insertData(desc, strlen(desc)); value_column.insert(value); size++; @@ -45,4 +47,131 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, offsets.push_back(offsets.back() + size); } +/// Add records about provided non-zero ProfileEvents::Counters. +static void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name) +{ + size_t rows = 0; + auto & name_column = columns[NAME_COLUMN_INDEX]; + auto & value_column = columns[VALUE_COLUMN_INDEX]; + for (Event event = 0; event < Counters::num_counters; ++event) + { + Int64 value = snapshot.counters[event]; + + if (value == 0) + continue; + + const char * desc = getName(event); + name_column->insertData(desc, strlen(desc)); + value_column->insert(value); + rows++; + } + + // Fill the rest of the columns with data + for (size_t row = 0; row < rows; ++row) + { + size_t i = 0; + columns[i++]->insertData(host_name.data(), host_name.size()); + columns[i++]->insert(UInt64(snapshot.current_time)); + columns[i++]->insert(UInt64{snapshot.thread_id}); + columns[i++]->insert(Type::INCREMENT); + } +} + +static void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name) +{ + size_t i = 0; + columns[i++]->insertData(host_name.data(), host_name.size()); + columns[i++]->insert(UInt64(snapshot.current_time)); + columns[i++]->insert(UInt64{snapshot.thread_id}); + columns[i++]->insert(Type::GAUGE); + + columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME)); + columns[i++]->insert(snapshot.memory_usage); +} + +void getProfileEvents( + const String & server_display_name, + DB::InternalProfileEventsQueuePtr profile_queue, + DB::Block & block, + ThreadIdToCountersSnapshot & last_sent_snapshots) +{ + using namespace DB; + static const NamesAndTypesList column_names_and_types = { + {"host_name", std::make_shared()}, + {"current_time", std::make_shared()}, + {"thread_id", std::make_shared()}, + {"type", TypeEnum}, + {"name", std::make_shared()}, + {"value", std::make_shared()}, + }; + + ColumnsWithTypeAndName temp_columns; + for (auto const & name_and_type : column_names_and_types) + temp_columns.emplace_back(name_and_type.type, name_and_type.name); + + block = std::move(temp_columns); + MutableColumns columns = block.mutateColumns(); + auto thread_group = CurrentThread::getGroup(); + auto const current_thread_id = CurrentThread::get().thread_id; + std::vector snapshots; + ThreadIdToCountersSnapshot new_snapshots; + ProfileEventsSnapshot group_snapshot; + { + auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads(); + snapshots.reserve(stats.size()); + + for (auto & stat : stats) + { + auto const thread_id = stat.thread_id; + if (thread_id == current_thread_id) + continue; + auto current_time = time(nullptr); + auto previous_snapshot = last_sent_snapshots.find(thread_id); + auto increment = + previous_snapshot != last_sent_snapshots.end() + ? CountersIncrement(stat.counters, previous_snapshot->second) + : CountersIncrement(stat.counters); + snapshots.push_back(ProfileEventsSnapshot{ + thread_id, + std::move(increment), + stat.memory_usage, + current_time + }); + new_snapshots[thread_id] = std::move(stat.counters); + } + + group_snapshot.thread_id = 0; + group_snapshot.current_time = time(nullptr); + group_snapshot.memory_usage = thread_group->memory_tracker.get(); + auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot(); + auto prev_group_snapshot = last_sent_snapshots.find(0); + group_snapshot.counters = + prev_group_snapshot != last_sent_snapshots.end() + ? CountersIncrement(group_counters, prev_group_snapshot->second) + : CountersIncrement(group_counters); + new_snapshots[0] = std::move(group_counters); + } + last_sent_snapshots = std::move(new_snapshots); + + for (auto & snapshot : snapshots) + { + dumpProfileEvents(snapshot, columns, server_display_name); + dumpMemoryTracker(snapshot, columns, server_display_name); + } + dumpProfileEvents(group_snapshot, columns, server_display_name); + dumpMemoryTracker(group_snapshot, columns, server_display_name); + + Block curr_block; + + while (profile_queue->tryPop(curr_block)) + { + auto curr_columns = curr_block.getColumns(); + for (size_t j = 0; j < curr_columns.size(); ++j) + columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size()); + } + bool empty = columns[0]->empty(); + if (!empty) + block.setColumns(std::move(columns)); +} + } diff --git a/src/Interpreters/ProfileEventsExt.h b/src/Interpreters/ProfileEventsExt.h index 8a92eadec79..7d9fc512d15 100644 --- a/src/Interpreters/ProfileEventsExt.h +++ b/src/Interpreters/ProfileEventsExt.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include @@ -7,9 +8,28 @@ namespace ProfileEvents { +constexpr size_t NAME_COLUMN_INDEX = 4; +constexpr size_t VALUE_COLUMN_INDEX = 5; + +struct ProfileEventsSnapshot +{ + UInt64 thread_id; + CountersIncrement counters; + Int64 memory_usage; + time_t current_time; +}; + +using ThreadIdToCountersSnapshot = std::unordered_map; + /// Dumps profile events to columns Map(String, UInt64) void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true); +void getProfileEvents( + const String & server_display_name, + DB::InternalProfileEventsQueuePtr profile_queue, + DB::Block & block, + ThreadIdToCountersSnapshot & last_sent_snapshots); + /// This is for ProfileEvents packets. enum Type : int8_t { diff --git a/src/Loggers/Loggers.cpp b/src/Loggers/Loggers.cpp index ba6f9656746..c6b83b41a76 100644 --- a/src/Loggers/Loggers.cpp +++ b/src/Loggers/Loggers.cpp @@ -189,7 +189,6 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log Poco::AutoPtr pf = new OwnPatternFormatter(color_enabled); Poco::AutoPtr log = new DB::OwnFormattingChannel(pf, new Poco::ConsoleChannel); - logger.warning("Logging " + console_log_level_string + " to console"); log->setLevel(console_log_level); split->addChannel(log, "console"); } diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index ae2bebde3dd..4e2f79c0101 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -893,167 +892,19 @@ void TCPHandler::sendExtremes(const Block & extremes) } } - -namespace -{ - using namespace ProfileEvents; - - constexpr size_t NAME_COLUMN_INDEX = 4; - constexpr size_t VALUE_COLUMN_INDEX = 5; - - struct ProfileEventsSnapshot - { - UInt64 thread_id; - ProfileEvents::CountersIncrement counters; - Int64 memory_usage; - time_t current_time; - }; - - /* - * Add records about provided non-zero ProfileEvents::Counters. - */ - void dumpProfileEvents( - ProfileEventsSnapshot const & snapshot, - MutableColumns & columns, - String const & host_name) - { - size_t rows = 0; - auto & name_column = columns[NAME_COLUMN_INDEX]; - auto & value_column = columns[VALUE_COLUMN_INDEX]; - for (ProfileEvents::Event event = 0; event < ProfileEvents::Counters::num_counters; ++event) - { - Int64 value = snapshot.counters[event]; - - if (value == 0) - continue; - - const char * desc = ProfileEvents::getName(event); - name_column->insertData(desc, strlen(desc)); - value_column->insert(value); - rows++; - } - - // Fill the rest of the columns with data - for (size_t row = 0; row < rows; ++row) - { - size_t i = 0; - columns[i++]->insertData(host_name.data(), host_name.size()); - columns[i++]->insert(UInt64(snapshot.current_time)); - columns[i++]->insert(UInt64{snapshot.thread_id}); - columns[i++]->insert(ProfileEvents::Type::INCREMENT); - } - } - - void dumpMemoryTracker( - ProfileEventsSnapshot const & snapshot, - MutableColumns & columns, - String const & host_name) - { - { - size_t i = 0; - columns[i++]->insertData(host_name.data(), host_name.size()); - columns[i++]->insert(UInt64(snapshot.current_time)); - columns[i++]->insert(UInt64{snapshot.thread_id}); - columns[i++]->insert(ProfileEvents::Type::GAUGE); - - columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME)); - columns[i++]->insert(snapshot.memory_usage); - } - } -} - - void TCPHandler::sendProfileEvents() { if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) return; - NamesAndTypesList column_names_and_types = { - { "host_name", std::make_shared() }, - { "current_time", std::make_shared() }, - { "thread_id", std::make_shared() }, - { "type", ProfileEvents::TypeEnum }, - { "name", std::make_shared() }, - { "value", std::make_shared() }, - }; - - ColumnsWithTypeAndName temp_columns; - for (auto const & name_and_type : column_names_and_types) - temp_columns.emplace_back(name_and_type.type, name_and_type.name); - - Block block(std::move(temp_columns)); /// NOLINT(performance-move-const-arg) - - MutableColumns columns = block.mutateColumns(); - auto thread_group = CurrentThread::getGroup(); - auto const current_thread_id = CurrentThread::get().thread_id; - std::vector snapshots; - ThreadIdToCountersSnapshot new_snapshots; - ProfileEventsSnapshot group_snapshot; - { - auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads(); - snapshots.reserve(stats.size()); - - for (auto & stat : stats) - { - auto const thread_id = stat.thread_id; - if (thread_id == current_thread_id) - continue; - auto current_time = time(nullptr); - auto previous_snapshot = last_sent_snapshots.find(thread_id); - auto increment = - previous_snapshot != last_sent_snapshots.end() - ? CountersIncrement(stat.counters, previous_snapshot->second) - : CountersIncrement(stat.counters); - snapshots.push_back(ProfileEventsSnapshot{ - thread_id, - std::move(increment), - stat.memory_usage, - current_time - }); - new_snapshots[thread_id] = std::move(stat.counters); - } - - group_snapshot.thread_id = 0; - group_snapshot.current_time = time(nullptr); - group_snapshot.memory_usage = thread_group->memory_tracker.get(); - auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot(); - auto prev_group_snapshot = last_sent_snapshots.find(0); - group_snapshot.counters = - prev_group_snapshot != last_sent_snapshots.end() - ? CountersIncrement(group_counters, prev_group_snapshot->second) - : CountersIncrement(group_counters); - new_snapshots[0] = std::move(group_counters); - } - last_sent_snapshots = std::move(new_snapshots); - - for (auto & snapshot : snapshots) - { - dumpProfileEvents(snapshot, columns, server_display_name); - dumpMemoryTracker(snapshot, columns, server_display_name); - } - dumpProfileEvents(group_snapshot, columns, server_display_name); - dumpMemoryTracker(group_snapshot, columns, server_display_name); - - MutableColumns logs_columns; - Block curr_block; - - for (; state.profile_queue->tryPop(curr_block); ) - { - auto curr_columns = curr_block.getColumns(); - for (size_t j = 0; j < curr_columns.size(); ++j) - columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size()); - } - - bool empty = columns[0]->empty(); - if (!empty) + Block block; + ProfileEvents::getProfileEvents(server_display_name, state.profile_queue, block, last_sent_snapshots); + if (block.rows() != 0) { - block.setColumns(std::move(columns)); - initProfileEventsBlockOutput(block); writeVarUInt(Protocol::Server::ProfileEvents, *out); writeStringBinary("", *out); - state.profile_events_block_out->write(block); out->next(); } diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index a4435ba713d..76d98daee05 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -3,9 +3,10 @@ #include #include -#include "Common/ProfileEvents.h" +#include #include #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include #include #include @@ -37,6 +39,8 @@ struct Settings; class ColumnsDescription; struct ProfileInfo; class TCPServer; +class NativeWriter; +class NativeReader; /// State of query processing. struct QueryState @@ -191,9 +195,7 @@ class TCPHandler : public Poco::Net::TCPServerConnection CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection}; - using ThreadIdToCountersSnapshot = std::unordered_map; - - ThreadIdToCountersSnapshot last_sent_snapshots; + ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots; /// It is the name of the server that will be sent to the client. String server_display_name; diff --git a/utils/check-style/check-style b/utils/check-style/check-style index c3b1ec6ae03..8a4468c1339 100755 --- a/utils/check-style/check-style +++ b/utils/check-style/check-style @@ -74,6 +74,8 @@ EXTERN_TYPES_EXCLUDES=( ProfileEvents::Type ProfileEvents::TypeEnum ProfileEvents::dumpToMapColumn + ProfileEvents::getProfileEvents + ProfileEvents::ThreadIdToCountersSnapshot ProfileEvents::LOCAL_NAME ProfileEvents::CountersIncrement