Skip to content

Commit

Permalink
Bugfix/issue 214 revisit proton local (#218)
Browse files Browse the repository at this point in the history
  • Loading branch information
yokofly authored Oct 31, 2023
1 parent e2427df commit d00b5a3
Show file tree
Hide file tree
Showing 21 changed files with 396 additions and 224 deletions.
15 changes: 15 additions & 0 deletions programs/bash-completion/completions/proton-bootstrap
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ shopt -s extglob

export _CLICKHOUSE_COMPLETION_LOADED=1

CLICKHOUSE_logs_level=(
none
fatal
error
warning
information
debug
trace
test
)

CLICKHOUSE_QueryProcessingStage=(
complete
fetch_columns
Expand Down Expand Up @@ -113,6 +124,10 @@ function _complete_for_proton_generic_bin_impl()
COMPREPLY=( $(compgen -W "${CLICKHOUSE_QueryProcessingStage[*]}" -- "$cur") )
return 1
;;
--send_logs_level)
COMPREPLY=( $(compgen -W "${CLICKHOUSE_logs_level[*]}" -- "$cur") )
return 1
;;
--format|--input-format|--output-format)
COMPREPLY=( $(compgen -W "${CLICKHOUSE_Format[*]}" -- "$cur") )
return 1
Expand Down
1 change: 1 addition & 0 deletions programs/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,7 @@ void Client::processConfig()
global_context->setCurrentQueryId(query_id);
}
print_stack_trace = config().getBool("stacktrace", false);
logging_initialized = true;

if (config().has("multiquery"))
is_multiquery = true;
Expand Down
72 changes: 51 additions & 21 deletions programs/local/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ void LocalServer::tryInitPath()
if (path.back() != '/')
path += '/';


fs::create_directories(fs::path(path) / "user_defined/");
fs::create_directories(fs::path(path) / "data/");
fs::create_directories(fs::path(path) / "metadata/");
fs::create_directories(fs::path(path) / "metadata_dropped/");

global_context->setPath(path);

global_context->setTemporaryStorage(path + "tmp");
Expand Down Expand Up @@ -400,13 +406,28 @@ void LocalServer::setupUsers()
auto & access_control = global_context->getAccessControl();
access_control.setNoPasswordAllowed(config().getBool("allow_no_password", true));
access_control.setPlaintextPasswordAllowed(config().getBool("allow_plaintext_password", true));

if (config().has("users_config") || config().has("config-file") || fs::exists("config.xml"))
if (config().has("config-file") || fs::exists("config.xml"))
{
const auto users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
ConfigProcessor config_processor(users_config_path);
const auto loaded_config = config_processor.loadConfig();
users_config = loaded_config.configuration;
String config_path = config().getString("config-file", "");
bool has_user_directories = config().has("user_directories");
const auto config_dir = fs::path{config_path}.remove_filename().string();
String users_config_path = config().getString("users_config", "");

if (users_config_path.empty() && has_user_directories)
{
users_config_path = config().getString("user_directories.users_xml.path");
if (fs::path(users_config_path).is_relative() && fs::exists(fs::path(config_dir) / users_config_path))
users_config_path = fs::path(config_dir) / users_config_path;
}

if (users_config_path.empty())
users_config = getConfigurationFromXMLString(minimal_default_user_xml);
else
{
ConfigProcessor config_processor(users_config_path);
const auto loaded_config = config_processor.loadConfig();
users_config = loaded_config.configuration;
}
}
else
users_config = getConfigurationFromXMLString(minimal_default_user_xml);
Expand All @@ -416,11 +437,11 @@ void LocalServer::setupUsers()
throw Exception("Can't load config for users", ErrorCodes::CANNOT_LOAD_CONFIG);
}


void LocalServer::connect()
{
connection_parameters = ConnectionParameters(config());
connection = LocalConnection::createConnection(connection_parameters, global_context, need_render_progress);
connection = LocalConnection::createConnection(
connection_parameters, global_context, need_render_progress, need_render_profile_events, server_display_name);
}


Expand Down Expand Up @@ -523,6 +544,14 @@ catch (...)
return getCurrentExceptionCode();
}

void LocalServer::updateLoggerLevel(const String & logs_level)
{
if (!logging_initialized)
return;

config().setString("logger.level", logs_level);
updateLevels(config(), logger());
}

void LocalServer::processConfig()
{
Expand All @@ -549,30 +578,31 @@ void LocalServer::processConfig()
auto logging = (config().has("logger.console")
|| config().has("logger.level")
|| config().has("log-level")
|| config().has("send_logs_level")
|| config().has("logger.log"));

auto file_logging = config().has("server_logs_file");
if (is_interactive && logging && !file_logging)
throw Exception("For interactive mode logging is allowed only with --server_logs_file option",
ErrorCodes::BAD_ARGUMENTS);
auto level = config().getString("log-level", "trace");

if (file_logging)
if (config().has("server_logs_file"))
{
auto level = Poco::Logger::parseLevel(config().getString("log-level", "trace"));
Poco::Logger::root().setLevel(level);
auto poco_logs_level = Poco::Logger::parseLevel(level);
Poco::Logger::root().setLevel(poco_logs_level);
Poco::Logger::root().setChannel(Poco::AutoPtr<Poco::SimpleFileChannel>(new Poco::SimpleFileChannel(server_logs_file)));
logging_initialized = true;
}
else if (logging)
else if (logging || is_interactive)
{
// force enable logging
config().setString("logger", "logger");
// sensitive data rules are not used here
buildLoggers(config(), logger(), "proton-local");
auto log_level_default = is_interactive && !logging ? "none" : level;
config().setString("logger.level", config().getString("log-level", config().getString("send_logs_level", log_level_default)));
buildLoggers(config(), logger(), "clickhouse-local");
logging_initialized = true;
}
else
{
Poco::Logger::root().setLevel("none");
Poco::Logger::root().setChannel(Poco::AutoPtr<Poco::NullChannel>(new Poco::NullChannel()));
logging_initialized = false;
}

shared_context = Context::createShared();
Expand Down Expand Up @@ -663,8 +693,6 @@ void LocalServer::processConfig()
status.emplace(fs::path(path) / "status", StatusFile::write_full_info);

LOG_DEBUG(log, "Loading metadata from {}", path);
fs::create_directories(fs::path(path) / "data/");
fs::create_directories(fs::path(path) / "metadata/");

loadMetadataSystem(global_context);
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
Expand Down Expand Up @@ -802,6 +830,8 @@ void LocalServer::processOptions(const OptionsDescription &, const CommandLineOp
config().setString("logger.log", options["logger.log"].as<std::string>());
if (options.count("logger.level"))
config().setString("logger.level", options["logger.level"].as<std::string>());
if (options.count("send_logs_level"))
config().setString("send_logs_level", options["send_logs_level"].as<std::string>());
}

}
Expand Down
2 changes: 2 additions & 0 deletions programs/local/LocalServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class LocalServer : public ClientBase, public Loggers
const std::vector<Arguments> &) override;
void processConfig() override;

void updateLoggerLevel(const String & logs_level) override;

private:
/** Composes CREATE subquery based on passed arguments (--structure --file --table and --input-format)
* This query will be executed first, before queries passed through --query argument
Expand Down
46 changes: 32 additions & 14 deletions src/Client/ClientBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ void ClientBase::onProfileEvents(Block & block)
if (rows == 0)
return;

/// if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
/// if (getName() == "local" || server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
{
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
Expand Down Expand Up @@ -1320,6 +1320,13 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
}
}

if (const auto * set_query = parsed_query->as<ASTSetQuery>())
{
const auto * logs_level_field = set_query->changes.tryGet(std::string_view{"send_logs_level"});
if (logs_level_field)
updateLoggerLevel(logs_level_field->safeGet<String>());
}

processed_rows = 0;
written_first_block = false;
progress_indication.resetProgress();
Expand Down Expand Up @@ -1518,9 +1525,21 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(

bool ClientBase::processQueryText(const String & text)
{
if (exit_strings.end() != exit_strings.find(trim(text, [](char c) { return isWhitespaceASCII(c) || c == ';'; })))
auto trimmed_input = trim(text, [](char c) { return isWhitespaceASCII(c) || c == ';'; });

if (exit_strings.end() != exit_strings.find(trimmed_input))
return false;

if (trimmed_input.starts_with("\\i"))
{
size_t skip_prefix_size = std::strlen("\\i");
auto file_name = trim(
trimmed_input.substr(skip_prefix_size, trimmed_input.size() - skip_prefix_size),
[](char c) { return isWhitespaceASCII(c); });

return processMultiQueryFromFile(file_name);
}

if (!is_multiquery)
{
assert(!query_fuzzer_runs);
Expand Down Expand Up @@ -1689,28 +1708,27 @@ void ClientBase::runInteractive()
std::cout << "Bye." << std::endl;
}

bool ClientBase::processMultiQueryFromFile(const String & file_name)
{
String queries_from_file;

ReadBufferFromFile in(file_name);
readStringUntilEOF(queries_from_file, in);

return executeMultiQuery(queries_from_file);
}

void ClientBase::runNonInteractive()
{
if (!queries_files.empty())
{
auto process_multi_query_from_file = [&](const String & file)
{
String queries_from_file;

ReadBufferFromFile in(file);
readStringUntilEOF(queries_from_file, in);

return executeMultiQuery(queries_from_file);
};

for (const auto & queries_file : queries_files)
{
for (const auto & interleave_file : interleave_queries_files)
if (!process_multi_query_from_file(interleave_file))
if (!processMultiQueryFromFile(interleave_file))
return;

if (!process_multi_query_from_file(queries_file))
if (!processMultiQueryFromFile(queries_file))
return;
}

Expand Down
12 changes: 9 additions & 3 deletions src/Client/ClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ClientBase : public Poco::Util::Application, public IHints<2, ClientBase>
std::optional<ProgramOptionsDescription> external_description;
};

virtual void updateLoggerLevel(const String &) {}
virtual void printHelpMessage(const OptionsDescription & options_description) = 0;
virtual void addOptions(OptionsDescription & options_description) = 0;
virtual void processOptions(const OptionsDescription & options_description,
Expand Down Expand Up @@ -140,6 +141,11 @@ class ClientBase : public Poco::Util::Application, public IHints<2, ClientBase>
void updateSuggest(const ASTCreateQuery & ast_create);

protected:
SharedContextHolder shared_context;
ContextMutablePtr global_context;

bool processMultiQueryFromFile(const String & file_name);

bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
bool is_multiquery = false;
bool delayed_interactive = false;
Expand Down Expand Up @@ -175,9 +181,6 @@ class ClientBase : public Poco::Util::Application, public IHints<2, ClientBase>
/// Settings specified via command line args
Settings cmd_settings;

SharedContextHolder shared_context;
ContextMutablePtr global_context;

/// thread status should be destructed before shared context because it relies on process list.
std::optional<ThreadStatus> thread_status;

Expand Down Expand Up @@ -211,6 +214,7 @@ class ClientBase : public Poco::Util::Application, public IHints<2, ClientBase>

ProgressIndication progress_indication;
bool need_render_progress = true;
bool need_render_profile_events = true;
bool written_first_block = false;
size_t processed_rows = 0; /// How many rows have been read or written.

Expand Down Expand Up @@ -245,6 +249,8 @@ class ClientBase : public Poco::Util::Application, public IHints<2, ClientBase>
QueryProcessingStage::Enum query_processing_stage;

bool cancelled = false;

bool logging_initialized = false;
};

}
Loading

0 comments on commit d00b5a3

Please sign in to comment.