diff --git a/base/base/defines.h b/base/base/defines.h
index e0fda8e9c71..fec59153eba 100644
--- a/base/base/defines.h
+++ b/base/base/defines.h
@@ -155,6 +155,7 @@
# define TSA_ACQUIRE_SHARED(...) __attribute__((acquire_shared_capability(__VA_ARGS__))) /// function acquires a shared capability, but does not release it
# define TSA_TRY_ACQUIRE_SHARED(...) __attribute__((try_acquire_shared_capability(__VA_ARGS__))) /// function tries to acquire a shared capability and returns a boolean value indicating success or failure
# define TSA_RELEASE_SHARED(...) __attribute__((release_shared_capability(__VA_ARGS__))) /// function releases the given shared capability
+# define TSA_SCOPED_LOCKABLE __attribute__((scoped_lockable)) /// object of a class has scoped lockable capability
/// Macros for suppressing TSA warnings for specific reads/writes (instead of suppressing it for the whole function)
/// They use a lambda function to apply function attribute to a single statement. This enable us to suppress warnings locally instead of
@@ -182,6 +183,7 @@
# define TSA_ACQUIRE_SHARED(...)
# define TSA_TRY_ACQUIRE_SHARED(...)
# define TSA_RELEASE_SHARED(...)
+# define TSA_SCOPED_LOCKABLE
# define TSA_SUPPRESS_WARNING_FOR_READ(x) (x)
# define TSA_SUPPRESS_WARNING_FOR_WRITE(x) (x)
diff --git a/src/Common/SharedLockGuard.h b/src/Common/SharedLockGuard.h
new file mode 100644
index 00000000000..9186dd27596
--- /dev/null
+++ b/src/Common/SharedLockGuard.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+
+/** SharedLockGuard provide RAII-style locking mechanism for acquiring shared ownership of the implementation
+ * of the SharedLockable concept (for example std::shared_mutex) supplied as the constructor argument.
+ * On construction it acquires shared ownership using `lock_shared` method.
+ * On desruction shared ownership is released using `unlock_shared` method.
+ */
+template
+class TSA_SCOPED_LOCKABLE SharedLockGuard
+{
+public:
+ explicit SharedLockGuard(Mutex & mutex_) TSA_ACQUIRE_SHARED(mutex_) : mutex(mutex_) { mutex_.lock_shared(); }
+
+ ~SharedLockGuard() TSA_RELEASE() { mutex.unlock_shared(); }
+
+private:
+ Mutex & mutex;
+};
+
+}
diff --git a/src/Common/SharedMutexHelper.h b/src/Common/SharedMutexHelper.h
new file mode 100644
index 00000000000..8dddaab6c78
--- /dev/null
+++ b/src/Common/SharedMutexHelper.h
@@ -0,0 +1,112 @@
+#pragma once
+
+#include
+#include
+#include
+
+namespace DB
+{
+
+/** SharedMutexHelper class allows to inject specific logic when underlying shared mutex is acquired
+ * and released.
+ *
+ * Example:
+ *
+ * class ProfileSharedMutex : public SharedMutexHelper
+ * {
+ * public:
+ * size_t getLockCount() const { return lock_count; }
+ *
+ * size_t getSharedLockCount() const { return shared_lock_count; }
+ *
+ * private:
+ * using Base = SharedMutexHelper;
+ * friend class SharedMutexHelper;
+ *
+ * void lockImpl()
+ * {
+ * ++lock_count;
+ * Base::lockImpl();
+ * }
+ *
+ * void lockSharedImpl()
+ * {
+ * ++shared_lock_count;
+ * Base::lockSharedImpl();
+ * }
+ *
+ * std::atomic lock_count = 0;
+ * std::atomic shared_lock_count = 0;
+ * };
+ */
+template
+class TSA_CAPABILITY("SharedMutexHelper") SharedMutexHelper
+{
+public:
+ // Exclusive ownership
+ void lock() TSA_ACQUIRE() /// NOLINT
+ {
+ static_cast(this)->lockImpl();
+ }
+
+ bool try_lock() TSA_TRY_ACQUIRE(true) /// NOLINT
+ {
+ static_cast(this)->tryLockImpl();
+ }
+
+ void unlock() TSA_RELEASE() /// NOLINT
+ {
+ static_cast(this)->unlockImpl();
+ }
+
+ // Shared ownership
+ void lock_shared() TSA_ACQUIRE_SHARED() /// NOLINT
+ {
+ static_cast(this)->lockSharedImpl();
+ }
+
+ bool try_lock_shared() TSA_TRY_ACQUIRE_SHARED(true) /// NOLINT
+ {
+ static_cast(this)->tryLockSharedImpl();
+ }
+
+ void unlock_shared() TSA_RELEASE_SHARED() /// NOLINT
+ {
+ static_cast(this)->unlockSharedImpl();
+ }
+
+protected:
+ void lockImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
+ {
+ mutex.lock();
+ }
+
+ void tryLockImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
+ {
+ mutex.try_lock();
+ }
+
+ void unlockImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
+ {
+ mutex.unlock();
+ }
+
+ void lockSharedImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
+ {
+ mutex.lock_shared();
+ }
+
+ void tryLockSharedImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
+ {
+ mutex.try_lock_shared();
+ }
+
+ void unlockSharedImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
+ {
+ mutex.unlock_shared();
+ }
+
+ MutexType mutex;
+};
+
+}
diff --git a/src/Common/callOnce.h b/src/Common/callOnce.h
new file mode 100644
index 00000000000..402bb7365a1
--- /dev/null
+++ b/src/Common/callOnce.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+
+using OnceFlag = std::once_flag;
+
+template
+void callOnce(OnceFlag & flag, Callable && func, Args&&... args)
+{
+ std::call_once(flag, std::forward(func), std::forward(args)...);
+}
+
+}
diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp
index e579963b5e6..547e46d267c 100644
--- a/src/Interpreters/Context.cpp
+++ b/src/Interpreters/Context.cpp
@@ -12,6 +12,8 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
@@ -153,8 +155,8 @@ struct ContextSharedPart : boost::noncopyable
{
Poco::Logger * log = &Poco::Logger::get("Context");
- /// For access of most of shared objects. Recursive mutex.
- mutable std::recursive_mutex mutex;
+ /// For access of most of shared objects.
+ mutable ContextSharedMutex mutex;
/// Separate mutex for access of dictionaries. Separate mutex to avoid locks when server doing request to itself.
mutable std::mutex embedded_dictionaries_mutex;
mutable std::mutex external_dictionaries_mutex;
@@ -167,91 +169,106 @@ struct ContextSharedPart : boost::noncopyable
/// Separate mutex for re-initialization of zookeeper session. This operation could take a long time and must not interfere with another operations.
mutable std::mutex zookeeper_mutex;
- mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper.
- ConfigurationPtr zookeeper_config; /// Stores zookeeper configs
+ mutable zkutil::ZooKeeperPtr zookeeper TSA_GUARDED_BY(zookeeper_mutex); /// Client for ZooKeeper.
+ ConfigurationPtr zookeeper_config TSA_GUARDED_BY(zookeeper_mutex); /// Stores zookeeper configs
#if USE_NURAFT
mutable std::mutex keeper_dispatcher_mutex;
- mutable std::shared_ptr keeper_dispatcher;
+ mutable std::shared_ptr keeper_dispatcher TSA_GUARDED_BY(keeper_dispatcher_mutex);
/// proton: starts.
mutable std::mutex metastore_dispatcher_mutex;
- mutable std::shared_ptr metastore_dispatcher;
+ mutable std::shared_ptr metastore_dispatcher TSA_GUARDED_BY(metastore_dispatcher_mutex);
/// proton: ends.
#endif
mutable std::mutex auxiliary_zookeepers_mutex;
- mutable std::map auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
+ mutable std::map auxiliary_zookeepers TSA_GUARDED_BY(auxiliary_zookeepers_mutex); /// Map for auxiliary ZooKeeper clients.
ConfigurationPtr auxiliary_zookeepers_config; /// Stores auxiliary zookeepers configs
+ /// No lock required for interserver_io_host, interserver_io_port, interserver_scheme modified only during initialization
String interserver_io_host; /// The host name by which this server is available for other servers.
UInt16 interserver_io_port = 0; /// and port.
String interserver_scheme; /// http or https
MultiVersion interserver_io_credentials;
- String path; /// Path to the data directory, with a slash at the end.
- String flags_path; /// Path to the directory with some control flags for server maintenance.
- String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function.
- String dictionaries_lib_path; /// Path to the directory with user provided binaries and libraries for external dictionaries.
- String user_scripts_path; /// Path to the directory with user provided scripts.
- ConfigurationPtr config; /// Global configuration settings.
-
- String tmp_path; /// Path to the temporary files that occur when processing the request.
- mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
-
+ String path TSA_GUARDED_BY(mutex); /// Path to the data directory, with a slash at the end.
+ String flags_path TSA_GUARDED_BY(mutex); /// Path to the directory with some control flags for server maintenance.
+ String user_files_path TSA_GUARDED_BY(mutex); /// Path to the directory with user provided files, usable by 'file' table function.
+ String dictionaries_lib_path TSA_GUARDED_BY(mutex); /// Path to the directory with user provided binaries and libraries for external dictionaries.
+ String user_scripts_path TSA_GUARDED_BY(mutex); /// Path to the directory with user provided scripts.
+ ConfigurationPtr config TSA_GUARDED_BY(mutex); /// Global configuration settings.
+ String tmp_path TSA_GUARDED_BY(mutex); /// Path to the temporary files that occur when processing the request.
+ mutable VolumePtr tmp_volume TSA_GUARDED_BY(mutex); /// Volume for the the temporary files that occur when processing the request.
+
+ /// FIXME(yokofly): This value is currently unused and will be removed in the commit referenced below on May 3, 2022.
+ /// See: https://github.com/ClickHouse/ClickHouse/commit/5257ce31f86cd3852a8a30343596ad029c56b083#diff-c7c4cea868f661341c1e9866836dc34c1c88723f9f33b4e09db530c2ea074036L2843-R2854
+ /// Therefore, I did not add TSA_GUARDED_BY.
mutable VolumePtr backups_volume; /// Volume for all the backups.
- mutable std::unique_ptr embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
- mutable std::unique_ptr external_dictionaries_loader;
+ mutable std::unique_ptr embedded_dictionaries TSA_GUARDED_BY(embedded_dictionaries_mutex); /// Metrica's dictionaries. Have lazy initialization.
+ mutable std::unique_ptr external_dictionaries_loader TSA_GUARDED_BY(external_dictionaries_mutex);
/// proton: starts
- /// mutable std::unique_ptr external_user_defined_executable_functions_loader;
+ /// mutable std::unique_ptr external_user_defined_executable_functions_loader TSA_GUARDED_BY(external_user_defined_executable_functions_mutex);
/// proton: ends
- mutable std::unique_ptr external_models_loader;
+ mutable std::unique_ptr external_models_loader TSA_GUARDED_BY(external_models_mutex);
- ExternalLoaderXMLConfigRepository * external_models_config_repository = nullptr;
- scope_guard models_repository_guard;
+ ExternalLoaderXMLConfigRepository * external_models_config_repository TSA_GUARDED_BY(external_models_mutex) = nullptr;
+ scope_guard models_repository_guard TSA_GUARDED_BY(external_models_mutex);
- ExternalLoaderXMLConfigRepository * external_dictionaries_config_repository = nullptr;
- scope_guard dictionaries_xmls;
+ ExternalLoaderXMLConfigRepository * external_dictionaries_config_repository TSA_GUARDED_BY(external_dictionaries_mutex) = nullptr;
+ scope_guard dictionaries_xmls TSA_GUARDED_BY(external_dictionaries_mutex);
/// proton: starts
- Streaming::MetaStoreJSONConfigRepository * user_defined_executable_functions_config_repository = nullptr;
+ Streaming::MetaStoreJSONConfigRepository * user_defined_executable_functions_config_repository TSA_GUARDED_BY(external_user_defined_executable_functions_mutex) = nullptr;
/// proton: ends
- scope_guard user_defined_executable_functions_xmls;
+ scope_guard user_defined_executable_functions_xmls TSA_GUARDED_BY(external_user_defined_executable_functions_mutex);
+ mutable OnceFlag user_defined_sql_objects_loader_initialized;
mutable std::unique_ptr user_defined_sql_objects_loader;
#if USE_NLP
+ mutable OnceFlag synonyms_extensions_initialized;
mutable std::optional synonyms_extensions;
+
+ mutable OnceFlag lemmatizers_initialized;
mutable std::optional lemmatizers;
#endif
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
- std::unique_ptr access_control;
- mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
- mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
+ std::unique_ptr access_control TSA_GUARDED_BY(mutex);
+ mutable UncompressedCachePtr uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks.
+ mutable MarkCachePtr mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files.
+ mutable OnceFlag load_marks_threadpool_initialized;
mutable std::unique_ptr load_marks_threadpool; /// Threadpool for loading marks cache.
- mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
- mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
- mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
+ mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
+ mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
+ mutable MMappedFileCachePtr mmap_cache TSA_GUARDED_BY(mutex); /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment.
GlobalOvercommitTracker global_overcommit_tracker;
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
- ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
+ ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
+ OnceFlag part_commit_pool_initialized;
mutable std::unique_ptr part_commit_pool; /// proton: A thread pool that can build part and commit in background (used for Stream table engine)
+ OnceFlag buffer_flush_schedule_pool_initialized;
mutable std::unique_ptr buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
+ OnceFlag schedule_pool_initialized;
mutable std::unique_ptr schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
+ OnceFlag distributed_schedule_pool_initialized;
mutable std::unique_ptr distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
+ OnceFlag message_broker_schedule_pool_initialized;
mutable std::unique_ptr message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
+ mutable OnceFlag readers_initialized;
mutable std::unique_ptr asynchronous_remote_fs_reader;
mutable std::unique_ptr asynchronous_local_fs_reader;
mutable std::unique_ptr synchronous_local_fs_reader;
+ mutable OnceFlag threadpool_writer_initialized;
mutable std::unique_ptr threadpool_writer;
mutable ThrottlerPtr remote_read_throttler; /// A server-wide throttler for remote IO reads
@@ -259,55 +276,63 @@ struct ContextSharedPart : boost::noncopyable
MultiVersion macros; /// Substitutions extracted from config.
/// Rules for selecting the compression settings, depending on the size of the part.
- mutable std::unique_ptr compression_codec_selector;
+ mutable std::unique_ptr compression_codec_selector TSA_GUARDED_BY(mutex);
/// Storage disk chooser for MergeTree engines
- mutable std::shared_ptr merge_tree_disk_selector;
+ mutable std::shared_ptr merge_tree_disk_selector TSA_GUARDED_BY(storage_policies_mutex);
/// Storage policy chooser for MergeTree engines
- mutable std::shared_ptr merge_tree_storage_policy_selector;
+ mutable std::shared_ptr merge_tree_storage_policy_selector TSA_GUARDED_BY(storage_policies_mutex);
/// proton: starts. remove `replicated` and add `stream`
- std::optional stream_settings; /// Settings of Stream* engines.
+ std::optional stream_settings TSA_GUARDED_BY(mutex); /// Settings of Stream* engines.
/// proton: ends.
std::atomic_size_t max_stream_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
std::atomic_size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default)
+ /// No lock required for format_schema_path modified only during initialization
String format_schema_path; /// Path to a directory that contains schema files used by input formats.
+ mutable OnceFlag action_locks_manager_initialized;
ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers
- std::unique_ptr system_logs; /// Used to log queries and operations on parts
- std::optional storage_s3_settings; /// Settings of S3 storage
- std::vector warnings; /// Store warning messages about server configuration.
+ OnceFlag system_logs_initialized;
+ std::unique_ptr system_logs TSA_GUARDED_BY(mutex); /// Used to log queries and operations on parts
+ std::optional storage_s3_settings TSA_GUARDED_BY(mutex); /// Settings of S3 storage
+ std::vector warnings TSA_GUARDED_BY(mutex); /// Store warning messages about server configuration.
/// Background executors for *MergeTree tables
- MergeMutateBackgroundExecutorPtr merge_mutate_executor;
- OrdinaryBackgroundExecutorPtr moves_executor;
- OrdinaryBackgroundExecutorPtr fetch_executor;
- OrdinaryBackgroundExecutorPtr common_executor;
+ /// Has background executors for MergeTree tables been initialized?
+ mutable ContextSharedMutex background_executors_mutex;
+ bool is_background_executors_initialized TSA_GUARDED_BY(background_executors_mutex) = false;
+ MergeMutateBackgroundExecutorPtr merge_mutate_executor TSA_GUARDED_BY(background_executors_mutex);
+ OrdinaryBackgroundExecutorPtr moves_executor TSA_GUARDED_BY(background_executors_mutex);
+ OrdinaryBackgroundExecutorPtr fetch_executor TSA_GUARDED_BY(background_executors_mutex);
+ OrdinaryBackgroundExecutorPtr common_executor TSA_GUARDED_BY(background_executors_mutex);
- RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
+ RemoteHostFilter remote_host_filter TSA_GUARDED_BY(mutex); /// Allowed URL from config.xml
+ /// No lock required for trace_collector modified only during initialization
std::optional trace_collector; /// Thread collecting traces from threads executing queries
/// Clusters for distributed tables
/// Initialized on demand (on distributed storages initialization) since Settings should be initialized
- std::shared_ptr clusters;
- ConfigurationPtr clusters_config; /// Stores updated configs
mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
+ std::shared_ptr clusters TSA_GUARDED_BY(clusters_mutex);
+ ConfigurationPtr clusters_config TSA_GUARDED_BY(clusters_mutex); /// Stores updated configs
+ /// No lock required for async_insert_queue modified only during initialization
std::shared_ptr async_insert_queue;
+
std::map server_ports;
- bool shutdown_called = false;
+ std::atomic shutdown_called = false;
- /// Has background executors for MergeTree tables been initialized?
- bool is_background_executors_initialized = false;
-
- Stopwatch uptime_watch;
+ Stopwatch uptime_watch TSA_GUARDED_BY(mutex);
+ /// No lock required for application_type modified only during initialization
Context::ApplicationType application_type = Context::ApplicationType::SERVER;
+ /// No lock required for config_reload_callback modified only during initialization
Context::ConfigReloadCallback config_reload_callback;
- bool is_server_completely_started = false;
+ bool is_server_completely_started TSA_GUARDED_BY(mutex) = false;
#if USE_ROCKSDB
/// Global merge tree metadata cache, stored in rocksdb.
@@ -414,14 +439,34 @@ struct ContextSharedPart : boost::noncopyable
}
}
+ void setConfig(const ConfigurationPtr & config_value)
+ {
+ if (!config_value)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Set nullptr config is invalid");
+
+ std::lock_guard lock(mutex);
+ config = config_value;
+ access_control->setExternalAuthenticatorsConfig(*config_value);
+ }
+
+ const Poco::Util::AbstractConfiguration & getConfigRefWithLock(const std::lock_guard &) const TSA_REQUIRES(this->mutex)
+ {
+ return config ? *config : Poco::Util::Application::instance().config();
+ }
+
+ const Poco::Util::AbstractConfiguration & getConfigRef() const
+ {
+ SharedLockGuard lock(mutex);
+ return config ? *config : Poco::Util::Application::instance().config();
+ }
/** Perform a complex job of destroying objects in advance.
*/
- void shutdown()
+ void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
{
- if (shutdown_called)
+ bool is_shutdown_called = shutdown_called.exchange(true);
+ if (is_shutdown_called)
return;
- shutdown_called = true;
/// Stop periodic reloading of the configuration files.
/// This must be done first because otherwise the reloading may pass a changed config
@@ -471,7 +516,7 @@ struct ContextSharedPart : boost::noncopyable
std::unique_ptr delete_access_control;
{
- auto lock = std::lock_guard(mutex);
+ std::lock_guard lock(mutex);
/** Compiled expressions stored in cache need to be destroyed before destruction of static objects.
* Because CHJIT instance can be static object.
@@ -561,7 +606,7 @@ struct ContextSharedPart : boost::noncopyable
trace_collector.emplace(std::move(trace_log));
}
- void addWarningMessage(const String & message)
+ void addWarningMessage(const String & message) TSA_REQUIRES(mutex)
{
/// A warning goes both: into server's log; stored to be placed in `system.warnings` table.
log->warning(message);
@@ -569,10 +614,29 @@ struct ContextSharedPart : boost::noncopyable
}
};
+void ContextSharedMutex::lockImpl()
+{
+ ProfileEvents::increment(ProfileEvents::ContextLock);
+ CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
+ Stopwatch watch;
+ Base::lockImpl();
+ ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
+}
+
+void ContextSharedMutex::lockSharedImpl()
+{
+ ProfileEvents::increment(ProfileEvents::ContextLock);
+ CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
+ Stopwatch watch;
+ Base::lockSharedImpl();
+ ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
+}
+
+ContextData::ContextData() = default;
+ContextData::ContextData(const ContextData &) = default;
Context::Context() = default;
-Context::Context(const Context &) = default;
-Context & Context::operator=(const Context &) = default;
+Context::Context(const Context & rhs) : ContextData(rhs), std::enable_shared_from_this(rhs) {}
SharedContextHolder::SharedContextHolder(SharedContextHolder &&) noexcept = default;
SharedContextHolder & SharedContextHolder::operator=(SharedContextHolder &&) noexcept = default;
@@ -583,10 +647,10 @@ SharedContextHolder::SharedContextHolder(std::unique_ptr shar
void SharedContextHolder::reset() { shared.reset(); }
-ContextMutablePtr Context::createGlobal(ContextSharedPart * shared)
+ContextMutablePtr Context::createGlobal(ContextSharedPart * shared_part)
{
auto res = std::shared_ptr(new Context);
- res->shared = shared;
+ res->shared = shared_part;
return res;
}
@@ -607,7 +671,7 @@ ContextMutablePtr Context::createCopy(const ContextPtr & other)
/// Ported the PR 'fix race in context::createcopy' from https://github.com/ClickHouse/ClickHouse/pull/49663/files
/// Tests associated with this PR were not ported. The related tests require additional functions which are not utilized in proton now.
- auto lock = other->getLock();
+ SharedLockGuard lock(other->mutex);
return std::shared_ptr(new Context(*other));
}
@@ -628,16 +692,6 @@ Context::~Context() = default;
InterserverIOHandler & Context::getInterserverIOHandler() { return shared->interserver_io_handler; }
-std::unique_lock Context::getLock() const
-{
- ProfileEvents::increment(ProfileEvents::ContextLock);
- CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
- Stopwatch watch;
- auto lock = std::unique_lock(shared->mutex);
- ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
- return lock;
-}
-
ProcessList & Context::getProcessList() { return shared->process_list; }
const ProcessList & Context::getProcessList() const { return shared->process_list; }
OvercommitTracker * Context::getGlobalOvercommitTracker() const { return &shared->global_overcommit_tracker; }
@@ -654,31 +708,31 @@ String Context::resolveDatabase(const String & database_name) const
String Context::getPath() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->path;
}
String Context::getFlagsPath() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->flags_path;
}
String Context::getUserFilesPath() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->user_files_path;
}
String Context::getDictionariesLibPath() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->dictionaries_lib_path;
}
String Context::getUserScriptsPath() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->user_scripts_path;
}
@@ -686,7 +740,7 @@ Strings Context::getWarnings() const
{
Strings common_warnings;
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
common_warnings = shared->warnings;
}
for (const auto & setting : settings)
@@ -703,13 +757,13 @@ Strings Context::getWarnings() const
VolumePtr Context::getTemporaryVolume() const
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
return shared->tmp_volume;
}
void Context::setPath(const String & path)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
shared->path = path;
@@ -731,26 +785,32 @@ void Context::setPath(const String & path)
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name)
{
- std::lock_guard lock(shared->storage_policies_mutex);
-
+ VolumePtr volume{};
if (policy_name.empty())
{
+ std::lock_guard lock(shared->mutex);
+
shared->tmp_path = path;
if (!shared->tmp_path.ends_with('/'))
shared->tmp_path += '/';
auto disk = std::make_shared("_tmp_default", shared->tmp_path, 0);
- shared->tmp_volume = std::make_shared("_tmp_default", disk, 0);
+ volume = std::make_shared("_tmp_default", disk, 0);
}
else
{
- StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
+ std::lock_guard storage_policies_lock(shared->storage_policies_mutex);
+
+ StoragePolicyPtr tmp_policy = getStoragePolicySelector(storage_policies_lock)->get(policy_name);
if (tmp_policy->getVolumes().size() != 1)
throw Exception("Policy " + policy_name + " is used temporary files, such policy should have exactly one volume",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
- shared->tmp_volume = tmp_policy->getVolume(0);
+ volume = tmp_policy->getVolume(0);
}
+ std::lock_guard lock(shared->mutex);
+
+ shared->tmp_volume = std::move(volume);
if (shared->tmp_volume->getDisks().empty())
throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
@@ -759,86 +819,85 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic
void Context::setFlagsPath(const String & path)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
shared->flags_path = path;
}
void Context::setUserFilesPath(const String & path)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
shared->user_files_path = path;
}
void Context::setDictionariesLibPath(const String & path)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
shared->dictionaries_lib_path = path;
}
void Context::setUserScriptsPath(const String & path)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
shared->user_scripts_path = path;
}
void Context::addWarningMessage(const String & msg)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
shared->addWarningMessage(msg);
}
void Context::setConfig(const ConfigurationPtr & config)
{
- auto lock = getLock();
- shared->config = config;
- shared->access_control->setExternalAuthenticatorsConfig(*shared->config);
+ shared->setConfig(config);
}
const Poco::Util::AbstractConfiguration & Context::getConfigRef() const
{
- auto lock = getLock();
- return shared->config ? *shared->config : Poco::Util::Application::instance().config();
+ return shared->getConfigRef();
}
AccessControl & Context::getAccessControl()
{
+ SharedLockGuard lock(shared->mutex);
return *shared->access_control;
}
const AccessControl & Context::getAccessControl() const
{
+ SharedLockGuard lock(shared->mutex);
return *shared->access_control;
}
void Context::setExternalAuthenticatorsConfig(const Poco::Util::AbstractConfiguration & config)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
shared->access_control->setExternalAuthenticatorsConfig(config);
}
std::unique_ptr Context::makeGSSAcceptorContext() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return std::make_unique(shared->access_control->getExternalAuthenticators().getKerberosParams());
}
void Context::setUsersConfig(const ConfigurationPtr & config)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
shared->users_config = config;
shared->access_control->setUsersConfig(*shared->users_config);
}
ConfigurationPtr Context::getUsersConfig()
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->users_config;
}
void Context::setUser(const UUID & user_id_)
{
- auto lock = getLock();
+ std::lock_guard lock(mutex);
user_id = user_id_;
@@ -850,10 +909,10 @@ void Context::setUser(const UUID & user_id_)
auto default_profile_info = access->getDefaultProfileInfo();
settings_constraints_and_current_profiles = default_profile_info->getConstraintsAndProfileIDs();
- applySettingsChanges(default_profile_info->settings);
+ applySettingsChangesWithLock(default_profile_info->settings, lock);
if (!user->default_database.empty())
- setCurrentDatabase(user->default_database);
+ setCurrentDatabaseWithLock(user->default_database, lock);
}
std::shared_ptr Context::getUser() const
@@ -868,7 +927,7 @@ String Context::getUserName() const
std::optional Context::getUserID() const
{
- auto lock = getLock();
+ SharedLockGuard lock(mutex);
return user_id;
}
@@ -913,18 +972,23 @@ void Context::setUserByName(const String & user_name)
void Context::setQuotaKey(String quota_key_)
{
- auto lock = getLock();
+ std::lock_guard lock(mutex);
client_info.quota_key = std::move(quota_key_);
}
-void Context::setCurrentRoles(const std::vector & current_roles_)
+void Context::setCurrentRolesWithLock(const std::vector & current_roles_, const std::lock_guard & lock)
{
- auto lock = getLock();
if (current_roles ? (*current_roles == current_roles_) : current_roles_.empty())
return;
current_roles = std::make_shared>(current_roles_);
- calculateAccessRights();
+ calculateAccessRightsWithLock(lock);
+}
+
+void Context::setCurrentRoles(const std::vector & current_roles_)
+{
+ std::lock_guard lock(mutex);
+ setCurrentRolesWithLock(current_roles_, lock);
}
void Context::setCurrentRolesDefault()
@@ -948,10 +1012,8 @@ std::shared_ptr Context::getRolesInfo() const
return getAccess()->getRolesInfo();
}
-
-void Context::calculateAccessRights()
+void Context::calculateAccessRightsWithLock(const std::lock_guard & lock)
{
- auto lock = getLock();
if (user_id)
access = getAccessControl().getContextAccess(
*user_id,
@@ -959,7 +1021,7 @@ void Context::calculateAccessRights()
/* use_default_roles = */ false,
settings,
current_database,
- client_info);
+ client_info);
}
@@ -985,20 +1047,20 @@ void Context::checkAccess(const AccessRightsElements & elements) const { return
std::shared_ptr Context::getAccess() const
{
- auto lock = getLock();
+ SharedLockGuard lock(mutex);
return access ? access : ContextAccess::getFullAccess();
}
ASTPtr Context::getRowPolicyFilter(const String & database, const String & table_name, RowPolicyFilterType filter_type) const
{
- auto lock = getLock();
+ SharedLockGuard lock(mutex);
auto row_filter_of_initial_user = row_policies_of_initial_user ? row_policies_of_initial_user->getFilter(database, table_name, filter_type) : nullptr;
return getAccess()->getRowPolicyFilter(database, table_name, filter_type, row_filter_of_initial_user);
}
void Context::enableRowPoliciesOfInitialUser()
{
- auto lock = getLock();
+ std::lock_guard lock(mutex);
row_policies_of_initial_user = nullptr;
if (client_info.initial_user == client_info.current_user)
return;
@@ -1021,13 +1083,12 @@ std::optional Context::getQuotaUsage() const
}
-void Context::setCurrentProfile(const String & profile_name)
+void Context::setCurrentProfileWithLock(const String & profile_name, const std::lock_guard & lock)
{
- auto lock = getLock();
try
{
UUID profile_id = getAccessControl().getID(profile_name);
- setCurrentProfile(profile_id);
+ setCurrentProfileWithLock(profile_id, lock);
}
catch (Exception & e)
{
@@ -1036,25 +1097,35 @@ void Context::setCurrentProfile(const String & profile_name)
}
}
-void Context::setCurrentProfile(const UUID & profile_id)
+void Context::setCurrentProfileWithLock(const UUID & profile_id, const std::lock_guard & lock)
{
- auto lock = getLock();
auto profile_info = getAccessControl().getSettingsProfileInfo(profile_id);
- checkSettingsConstraints(profile_info->settings);
- applySettingsChanges(profile_info->settings);
+ checkSettingsConstraintsWithLock(profile_info->settings);
+ applySettingsChangesWithLock(profile_info->settings, lock);
settings_constraints_and_current_profiles = profile_info->getConstraintsAndProfileIDs(settings_constraints_and_current_profiles);
}
+void Context::setCurrentProfile(const String & profile_name)
+{
+ std::lock_guard lock(mutex);
+ setCurrentProfileWithLock(profile_name, lock);
+}
+
+void Context::setCurrentProfile(const UUID & profile_id)
+{
+ std::lock_guard lock(mutex);
+ setCurrentProfileWithLock(profile_id, lock);
+}
std::vector Context::getCurrentProfiles() const
{
- auto lock = getLock();
+ SharedLockGuard lock(mutex);
return settings_constraints_and_current_profiles->current_profiles;
}
std::vector Context::getEnabledProfiles() const
{
- auto lock = getLock();
+ SharedLockGuard lock(mutex);
return settings_constraints_and_current_profiles->enabled_profiles;
}
@@ -1090,7 +1161,7 @@ Tables Context::getExternalTables() const
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
- auto lock = getLock();
+ SharedLockGuard lock(mutex);
Tables res;
for (const auto & table : external_tables_mapping)
@@ -1117,7 +1188,7 @@ void Context::addExternalTable(const String & table_name, TemporaryTableHolder &
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
- auto lock = getLock();
+ std::lock_guard lock(mutex);
if (external_tables_mapping.end() != external_tables_mapping.find(table_name))
throw Exception("Temporary stream " + backQuoteIfNeed(table_name) + " already exists.", ErrorCodes::STREAM_ALREADY_EXISTS);
external_tables_mapping.emplace(table_name, std::make_shared(std::move(temporary_table)));
@@ -1131,7 +1202,7 @@ std::shared_ptr Context::removeExternalTable(const String
std::shared_ptr holder;
{
- auto lock = getLock();
+ std::lock_guard lock(mutex);
auto iter = external_tables_mapping.find(table_name);
if (iter == external_tables_mapping.end())
return {};
@@ -1275,14 +1346,14 @@ StoragePtr Context::getViewSource() const
Settings Context::getSettings() const
{
- auto lock = getLock();
+ SharedLockGuard lock(mutex);
return settings;
}
void Context::setSettings(const Settings & settings_)
{
- auto lock = getLock();
+ std::lock_guard lock(mutex);
auto old_readonly = settings.readonly;
auto old_allow_ddl = settings.allow_ddl;
auto old_allow_introspection_functions = settings.allow_introspection_functions;
@@ -1290,39 +1361,70 @@ void Context::setSettings(const Settings & settings_)
settings = settings_;
if ((settings.readonly != old_readonly) || (settings.allow_ddl != old_allow_ddl) || (settings.allow_introspection_functions != old_allow_introspection_functions))
- calculateAccessRights();
+ calculateAccessRightsWithLock(lock);
}
-void Context::setSetting(std::string_view name, const String & value)
+void Context::setSettingWithLock(std::string_view name, const String & value, const std::lock_guard & lock)
{
- auto lock = getLock();
if (name == "profile")
{
- setCurrentProfile(value);
+ setCurrentProfileWithLock(value, lock);
return;
}
settings.set(name, value);
if (name == "readonly" || name == "allow_ddl" || name == "allow_introspection_functions")
- calculateAccessRights();
+ calculateAccessRightsWithLock(lock);
}
-void Context::setSetting(std::string_view name, const Field & value)
+void Context::setSettingWithLock(std::string_view name, const Field & value, const std::lock_guard & lock)
{
- auto lock = getLock();
if (name == "profile")
{
- setCurrentProfile(value.safeGet());
+ setCurrentProfileWithLock(value.safeGet(), lock);
return;
}
settings.set(name, value);
if (name == "readonly" || name == "allow_ddl" || name == "allow_introspection_functions")
- calculateAccessRights();
+ calculateAccessRightsWithLock(lock);
}
+void Context::applySettingChangeWithLock(const SettingChange & change, const std::lock_guard & lock)
+{
+ try
+ {
+ setSettingWithLock(change.name, change.value, lock);
+ }
+ catch (Exception & e)
+ {
+ e.addMessage(fmt::format(
+ "in attempt to set the value of setting '{}' to {}",
+ change.name, applyVisitor(FieldVisitorToString(), change.value)));
+ throw;
+ }
+}
+
+void Context::applySettingsChangesWithLock(const SettingsChanges & changes, const std::lock_guard & lock)
+{
+ for (const SettingChange & change : changes)
+ applySettingChangeWithLock(change, lock);
+ applySettingsQuirks(settings);
+}
+
+void Context::setSetting(std::string_view name, const String & value)
+{
+ std::lock_guard lock(mutex);
+ setSettingWithLock(name, value, lock);
+}
+
+void Context::setSetting(std::string_view name, const Field & value)
+{
+ std::lock_guard lock(mutex);
+ setSettingWithLock(name, value, lock);
+}
void Context::applySettingChange(const SettingChange & change)
{
@@ -1341,55 +1443,79 @@ void Context::applySettingChange(const SettingChange & change)
void Context::applySettingsChanges(const SettingsChanges & changes)
{
- auto lock = getLock();
- for (const SettingChange & change : changes)
- applySettingChange(change);
- applySettingsQuirks(settings);
+ std::lock_guard lock(mutex);
+ applySettingsChangesWithLock(changes, lock);
+}
+
+void Context::checkSettingsConstraintsWithLock(const SettingChange & change) const
+{
+ getSettingsConstraintsAndCurrentProfilesWithLock()->constraints.check(settings, change);
+}
+
+void Context::checkSettingsConstraintsWithLock(const SettingsChanges & changes) const
+{
+ getSettingsConstraintsAndCurrentProfilesWithLock()->constraints.check(settings, changes);
+}
+
+void Context::checkSettingsConstraintsWithLock(SettingsChanges & changes) const
+{
+ getSettingsConstraintsAndCurrentProfilesWithLock()->constraints.check(settings, changes);
+}
+
+void Context::clampToSettingsConstraintsWithLock(SettingsChanges & changes) const
+{
+ getSettingsConstraintsAndCurrentProfilesWithLock()->constraints.clamp(settings, changes);
}
void Context::checkSettingsConstraints(const SettingChange & change) const
{
- getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, change);
+ SharedLockGuard lock(mutex);
+ checkSettingsConstraintsWithLock(change);
}
void Context::checkSettingsConstraints(const SettingsChanges & changes) const
{
+ SharedLockGuard lock(mutex);
getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, changes);
}
void Context::checkSettingsConstraints(SettingsChanges & changes) const
{
- getSettingsConstraintsAndCurrentProfiles()->constraints.check(settings, changes);
+ SharedLockGuard lock(mutex);
+ checkSettingsConstraintsWithLock(changes);
}
void Context::clampToSettingsConstraints(SettingsChanges & changes) const
{
- getSettingsConstraintsAndCurrentProfiles()->constraints.clamp(settings, changes);
+ SharedLockGuard lock(mutex);
+ clampToSettingsConstraintsWithLock(changes);
}
void Context::resetSettingsToDefaultValue(const std::vector & names)
{
- auto lock = getLock();
+ std::lock_guard lock(mutex);
for (const String & name: names)
- {
settings.setDefaultValue(name);
- }
}
-std::shared_ptr Context::getSettingsConstraintsAndCurrentProfiles() const
+std::shared_ptr Context::getSettingsConstraintsAndCurrentProfilesWithLock() const
{
- auto lock = getLock();
if (settings_constraints_and_current_profiles)
return settings_constraints_and_current_profiles;
static auto no_constraints_or_profiles = std::make_shared(getAccessControl());
return no_constraints_or_profiles;
}
+std::shared_ptr Context::getSettingsConstraintsAndCurrentProfiles() const
+{
+ SharedLockGuard lock(mutex);
+ return getSettingsConstraintsAndCurrentProfilesWithLock();
+}
String Context::getCurrentDatabase() const
{
- auto lock = getLock();
+ SharedLockGuard lock(mutex);
return current_database;
}
@@ -1405,7 +1531,7 @@ void Context::setCurrentDatabaseNameInGlobalContext(const String & name)
if (!isGlobalContext())
throw Exception("Cannot set current database for non global context, this method should be used during server initialization",
ErrorCodes::LOGICAL_ERROR);
- auto lock = getLock();
+ std::lock_guard lock(mutex);
if (!current_database.empty())
throw Exception("Default database name cannot be changed in global context without server restart",
@@ -1414,12 +1540,17 @@ void Context::setCurrentDatabaseNameInGlobalContext(const String & name)
current_database = name;
}
-void Context::setCurrentDatabase(const String & name)
+void Context::setCurrentDatabaseWithLock(const String & name, const std::lock_guard & lock)
{
DatabaseCatalog::instance().assertDatabaseExists(name);
- auto lock = getLock();
current_database = name;
- calculateAccessRights();
+ calculateAccessRightsWithLock(lock);
+}
+
+void Context::setCurrentDatabase(const String & name)
+{
+ std::lock_guard lock(mutex);
+ setCurrentDatabaseWithLock(name, lock);
}
void Context::setCurrentQueryId(const String & query_id)
@@ -1567,10 +1698,10 @@ const ExternalDictionariesLoader & Context::getExternalDictionariesLoader() cons
ExternalDictionariesLoader & Context::getExternalDictionariesLoader()
{
std::lock_guard lock(shared->external_dictionaries_mutex);
- return getExternalDictionariesLoaderUnlocked();
+ return getExternalDictionariesLoaderWithLock(lock);
}
-ExternalDictionariesLoader & Context::getExternalDictionariesLoaderUnlocked()
+ExternalDictionariesLoader & Context::getExternalDictionariesLoaderWithLock(const std::lock_guard &) TSA_REQUIRES(shared->external_dictionaries_mutex)
{
if (!shared->external_dictionaries_loader)
shared->external_dictionaries_loader =
@@ -1586,10 +1717,10 @@ const ExternalUserDefinedFunctionsLoader & Context::getExternalUserDefinedExecut
ExternalUserDefinedFunctionsLoader & Context::getExternalUserDefinedExecutableFunctionsLoader()
{
std::lock_guard lock(shared->external_user_defined_executable_functions_mutex);
- return getExternalUserDefinedExecutableFunctionsLoaderUnlocked();
+ return getExternalUserDefinedExecutableFunctionsLoaderWithLock(lock);
}
-ExternalUserDefinedFunctionsLoader & Context::getExternalUserDefinedExecutableFunctionsLoaderUnlocked()
+ExternalUserDefinedFunctionsLoader & Context::getExternalUserDefinedExecutableFunctionsLoaderWithLock(const std::lock_guard &) TSA_REQUIRES(shared->external_user_defined_executable_functions_mutex)
{
return ExternalUserDefinedFunctionsLoader::instance(getGlobalContextInstance());
@@ -1607,10 +1738,10 @@ const ExternalModelsLoader & Context::getExternalModelsLoader() const
ExternalModelsLoader & Context::getExternalModelsLoader()
{
std::lock_guard lock(shared->external_models_mutex);
- return getExternalModelsLoaderUnlocked();
+ return getExternalModelsLoaderWithLock(lock);
}
-ExternalModelsLoader & Context::getExternalModelsLoaderUnlocked()
+ExternalModelsLoader & Context::getExternalModelsLoaderWithLock(const std::lock_guard &) TSA_REQUIRES(shared->external_models_mutex)
{
if (!shared->external_models_loader)
shared->external_models_loader =
@@ -1625,7 +1756,7 @@ void Context::loadOrReloadModels(const Poco::Util::AbstractConfiguration & confi
std::lock_guard lock(shared->external_models_mutex);
- auto & external_models_loader = getExternalModelsLoaderUnlocked();
+ auto & external_models_loader = getExternalModelsLoaderWithLock(lock);
if (shared->external_models_config_repository)
{
@@ -1673,7 +1804,7 @@ void Context::loadOrReloadDictionaries(const Poco::Util::AbstractConfiguration &
std::lock_guard lock(shared->external_dictionaries_mutex);
- auto & external_dictionaries_loader = getExternalDictionariesLoaderUnlocked();
+ auto & external_dictionaries_loader = getExternalDictionariesLoaderWithLock(lock);
external_dictionaries_loader.enableAlwaysLoadEverything(!dictionaries_lazy_load);
if (shared->external_dictionaries_config_repository)
@@ -1694,7 +1825,7 @@ void Context::loadOrReloadDictionaries(const Poco::Util::AbstractConfiguration &
void Context::loadOrReloadUserDefinedExecutableFunctions()
{
std::lock_guard lock(shared->external_user_defined_executable_functions_mutex);
- auto & external_user_defined_executable_functions_loader_ = getExternalUserDefinedExecutableFunctionsLoaderUnlocked();
+ auto & external_user_defined_executable_functions_loader_ = getExternalUserDefinedExecutableFunctionsLoaderWithLock(lock);
if (shared->user_defined_executable_functions_config_repository)
{
external_user_defined_executable_functions_loader_.reloadConfig(
@@ -1709,7 +1840,7 @@ void Context::loadOrReloadUserDefinedExecutableFunctions()
Streaming::MetaStoreJSONConfigRepository * Context::getMetaStoreJSONConfigRepository() const
{
- std::lock_guard lock(shared->metastore_dispatcher_mutex);
+ std::lock_guard lock(shared->external_user_defined_executable_functions_mutex);
if (!shared->user_defined_executable_functions_config_repository)
throw Exception(ErrorCodes::LOGICAL_ERROR, "MetaStoreJSONConfigRepository must be created first");
@@ -1719,17 +1850,21 @@ Streaming::MetaStoreJSONConfigRepository * Context::getMetaStoreJSONConfigReposi
const IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader() const
{
- auto lock = getLock();
- if (!shared->user_defined_sql_objects_loader)
+ callOnce(shared->user_defined_sql_objects_loader_initialized, [&] {
shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext());
+ });
+
+ SharedLockGuard lock(shared->mutex);
return *shared->user_defined_sql_objects_loader;
}
IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader()
{
- auto lock = getLock();
- if (!shared->user_defined_sql_objects_loader)
+ callOnce(shared->user_defined_sql_objects_loader_initialized, [&] {
shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext());
+ });
+
+ SharedLockGuard lock(shared->mutex);
return *shared->user_defined_sql_objects_loader;
}
@@ -1737,20 +1872,18 @@ IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader()
SynonymsExtensions & Context::getSynonymsExtensions() const
{
- auto lock = getLock();
-
- if (!shared->synonyms_extensions)
+ callOnce(shared->synonyms_extensions_initialized, [&] {
shared->synonyms_extensions.emplace(getConfigRef());
+ });
return *shared->synonyms_extensions;
}
Lemmatizers & Context::getLemmatizers() const
{
- auto lock = getLock();
-
- if (!shared->lemmatizers)
+ callOnce(shared->lemmatizers_initialized, [&] {
shared->lemmatizers.emplace(getConfigRef());
+ });
return *shared->lemmatizers;
}
@@ -1782,7 +1915,7 @@ ProcessList::Element * Context::getProcessListElement() const
void Context::setUncompressedCache(size_t max_size_in_bytes)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->uncompressed_cache)
throw Exception("Uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
@@ -1793,14 +1926,14 @@ void Context::setUncompressedCache(size_t max_size_in_bytes)
UncompressedCachePtr Context::getUncompressedCache() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->uncompressed_cache;
}
void Context::dropUncompressedCache() const
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->uncompressed_cache)
shared->uncompressed_cache->reset();
}
@@ -1808,7 +1941,7 @@ void Context::dropUncompressedCache() const
void Context::setMarkCache(size_t cache_size_in_bytes)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->mark_cache)
throw Exception("Mark cache has been already created.", ErrorCodes::LOGICAL_ERROR);
@@ -1818,34 +1951,32 @@ void Context::setMarkCache(size_t cache_size_in_bytes)
MarkCachePtr Context::getMarkCache() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->mark_cache;
}
void Context::dropMarkCache() const
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->mark_cache)
shared->mark_cache->reset();
}
ThreadPool & Context::getLoadMarksThreadpool() const
{
- const auto & config = getConfigRef();
-
- auto lock = getLock();
- if (!shared->load_marks_threadpool)
- {
+ callOnce(shared->load_marks_threadpool_initialized, [&] {
+ const auto & config = getConfigRef();
auto pool_size = config.getUInt(".load_marks_threadpool_pool_size", 50);
auto queue_size = config.getUInt(".load_marks_threadpool_queue_size", 1000000);
shared->load_marks_threadpool = std::make_unique(pool_size, pool_size, queue_size);
- }
+ });
+
return *shared->load_marks_threadpool;
}
void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->index_uncompressed_cache)
throw Exception("Index uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
@@ -1856,14 +1987,14 @@ void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
UncompressedCachePtr Context::getIndexUncompressedCache() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->index_uncompressed_cache;
}
void Context::dropIndexUncompressedCache() const
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->index_uncompressed_cache)
shared->index_uncompressed_cache->reset();
}
@@ -1871,7 +2002,7 @@ void Context::dropIndexUncompressedCache() const
void Context::setIndexMarkCache(size_t cache_size_in_bytes)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->index_mark_cache)
throw Exception("Index mark cache has been already created.", ErrorCodes::LOGICAL_ERROR);
@@ -1881,13 +2012,13 @@ void Context::setIndexMarkCache(size_t cache_size_in_bytes)
MarkCachePtr Context::getIndexMarkCache() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->index_mark_cache;
}
void Context::dropIndexMarkCache() const
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
}
@@ -1895,7 +2026,7 @@ void Context::dropIndexMarkCache() const
void Context::setMMappedFileCache(size_t cache_size_in_num_entries)
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->mmap_cache)
throw Exception("Mapped file cache has been already created.", ErrorCodes::LOGICAL_ERROR);
@@ -1905,13 +2036,13 @@ void Context::setMMappedFileCache(size_t cache_size_in_num_entries)
MMappedFileCachePtr Context::getMMappedFileCache() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->mmap_cache;
}
void Context::dropMMappedFileCache() const
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->mmap_cache)
shared->mmap_cache->reset();
}
@@ -1919,7 +2050,7 @@ void Context::dropMMappedFileCache() const
void Context::dropCaches() const
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (shared->uncompressed_cache)
shared->uncompressed_cache->reset();
@@ -1939,12 +2070,12 @@ void Context::dropCaches() const
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
{
- auto lock = getLock();
- if (!shared->buffer_flush_schedule_pool)
+ callOnce(shared->buffer_flush_schedule_pool_initialized, [&] {
shared->buffer_flush_schedule_pool = std::make_unique(
settings.background_buffer_flush_schedule_pool_size,
CurrentMetrics::BackgroundBufferFlushSchedulePoolTask,
"BgBufSchPool");
+ });
return *shared->buffer_flush_schedule_pool;
}
@@ -1981,40 +2112,40 @@ BackgroundTaskSchedulingSettings Context::getBackgroundMoveTaskSchedulingSetting
BackgroundSchedulePool & Context::getSchedulePool() const
{
- auto lock = getLock();
- if (!shared->schedule_pool)
+ callOnce(shared->schedule_pool_initialized, [&] {
shared->schedule_pool = std::make_unique(
settings.background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask,
"BgSchPool");
+ });
return *shared->schedule_pool;
}
BackgroundSchedulePool & Context::getDistributedSchedulePool() const
{
- auto lock = getLock();
- if (!shared->distributed_schedule_pool)
+ callOnce(shared->distributed_schedule_pool_initialized, [&] {
shared->distributed_schedule_pool = std::make_unique(
settings.background_distributed_schedule_pool_size,
CurrentMetrics::BackgroundDistributedSchedulePoolTask,
"BgDistSchPool");
+ });
return *shared->distributed_schedule_pool;
}
BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
{
- auto lock = getLock();
- if (!shared->message_broker_schedule_pool)
+ callOnce(shared->message_broker_schedule_pool_initialized, [&] {
shared->message_broker_schedule_pool = std::make_unique(
settings.background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
"BgMBSchPool");
+ });
return *shared->message_broker_schedule_pool;
}
ThrottlerPtr Context::getRemoteReadThrottler() const
{
- auto lock = getLock();
+ std::lock_guard lock(mutex);
if (!shared->remote_read_throttler)
shared->remote_read_throttler = std::make_shared(
settings.max_remote_read_network_bandwidth_for_server);
@@ -2024,7 +2155,7 @@ ThrottlerPtr Context::getRemoteReadThrottler() const
ThrottlerPtr Context::getRemoteWriteThrottler() const
{
- auto lock = getLock();
+ std::lock_guard lock(mutex);
if (!shared->remote_write_throttler)
shared->remote_write_throttler = std::make_shared(
settings.max_remote_write_network_bandwidth_for_server);
@@ -2198,26 +2329,24 @@ String Context::getInterserverScheme() const
void Context::setRemoteHostFilter(const Poco::Util::AbstractConfiguration & config)
{
+ std::lock_guard lock(shared->mutex);
shared->remote_host_filter.setValuesFromConfig(config);
}
const RemoteHostFilter & Context::getRemoteHostFilter() const
{
+ SharedLockGuard lock(shared->mutex);
return shared->remote_host_filter;
}
UInt16 Context::getTCPPort() const
{
- auto lock = getLock();
-
const auto & config = getConfigRef();
return config.getInt("tcp_port", DBMS_DEFAULT_PORT);
}
std::optional Context::getTCPPortSecure() const
{
- auto lock = getLock();
-
const auto & config = getConfigRef();
if (config.has("tcp_port_secure"))
return config.getInt("tcp_port_secure");
@@ -2323,8 +2452,11 @@ void Context::setCluster(const String & cluster_name, const std::shared_ptrsystem_logs = std::make_unique(getGlobalContext(), getConfigRef());
+ callOnce(shared->system_logs_initialized, [&] {
+ auto system_logs = std::make_unique(getGlobalContext(), getConfigRef());
+ std::lock_guard lock(shared->mutex);
+ shared->system_logs = std::move(system_logs);
+ });
}
void Context::initializeTraceCollector()
@@ -2347,7 +2479,7 @@ bool Context::hasTraceCollector() const
std::shared_ptr Context::getQueryLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2357,7 +2489,7 @@ std::shared_ptr Context::getQueryLog() const
std::shared_ptr Context::getQueryThreadLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2367,7 +2499,7 @@ std::shared_ptr Context::getQueryThreadLog() const
std::shared_ptr Context::getQueryViewsLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2377,7 +2509,7 @@ std::shared_ptr Context::getQueryViewsLog() const
std::shared_ptr Context::getPartLog(const String & part_database) const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
/// No part log or system logs are shutting down.
if (!shared->system_logs)
@@ -2395,7 +2527,7 @@ std::shared_ptr Context::getPartLog(const String & part_database) const
std::shared_ptr Context::getTraceLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2406,7 +2538,7 @@ std::shared_ptr Context::getTraceLog() const
std::shared_ptr Context::getTextLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2417,7 +2549,7 @@ std::shared_ptr Context::getTextLog() const
std::shared_ptr Context::getMetricLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2428,7 +2560,7 @@ std::shared_ptr Context::getMetricLog() const
std::shared_ptr Context::getAsynchronousMetricLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2439,7 +2571,7 @@ std::shared_ptr Context::getAsynchronousMetricLog() const
/// proton: starts.
std::shared_ptr Context::getPipelineMetricLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2450,7 +2582,7 @@ std::shared_ptr Context::getPipelineMetricLog() const
std::shared_ptr Context::getOpenTelemetrySpanLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2460,7 +2592,7 @@ std::shared_ptr Context::getOpenTelemetrySpanLog() const
std::shared_ptr Context::getSessionLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2471,7 +2603,7 @@ std::shared_ptr Context::getSessionLog() const
std::shared_ptr Context::getZooKeeperLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2482,7 +2614,7 @@ std::shared_ptr Context::getZooKeeperLog() const
std::shared_ptr Context::getProcessorsProfileLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2492,7 +2624,7 @@ std::shared_ptr Context::getProcessorsProfileLog() const
std::shared_ptr Context::getTransactionsInfoLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2502,7 +2634,7 @@ std::shared_ptr Context::getTransactionsInfoLog() const
std::shared_ptr Context::getFilesystemCacheLog() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
@@ -2511,12 +2643,12 @@ std::shared_ptr Context::getFilesystemCacheLog() const
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (!shared->compression_codec_selector)
{
constexpr auto config_name = "compression";
- const auto & config = getConfigRef();
+ const auto & config = shared->getConfigRefWithLock(lock);
if (config.has(config_name))
shared->compression_codec_selector = std::make_unique(config, "compression");
@@ -2550,6 +2682,11 @@ StoragePolicyPtr Context::getStoragePolicy(const String & name) const
DisksMap Context::getDisksMap() const
{
std::lock_guard lock(shared->storage_policies_mutex);
+ return getDisksMap(lock);
+}
+
+DisksMap Context::getDisksMap(std::lock_guard & lock) const
+{
return getDiskSelector(lock)->getDisksMap();
}
@@ -2559,7 +2696,7 @@ StoragePoliciesMap Context::getPoliciesMap() const
return getStoragePolicySelector(lock)->getPoliciesMap();
}
-DiskSelectorPtr Context::getDiskSelector(std::lock_guard & /* lock */) const
+DiskSelectorPtr Context::getDiskSelector(std::lock_guard & /* lock */) const TSA_REQUIRES(shared->storage_policies_mutex)
{
if (!shared->merge_tree_disk_selector)
{
@@ -2570,10 +2707,11 @@ DiskSelectorPtr Context::getDiskSelector(std::lock_guard & /* lock *
disk_selector->initialize(config, config_name, shared_from_this());
shared->merge_tree_disk_selector = disk_selector;
}
+
return shared->merge_tree_disk_selector;
}
-StoragePolicySelectorPtr Context::getStoragePolicySelector(std::lock_guard & lock) const
+StoragePolicySelectorPtr Context::getStoragePolicySelector(std::lock_guard & lock) const TSA_REQUIRES(shared->storage_policies_mutex)
{
if (!shared->merge_tree_storage_policy_selector)
{
@@ -2582,35 +2720,39 @@ StoragePolicySelectorPtr Context::getStoragePolicySelector(std::lock_guardmerge_tree_storage_policy_selector = std::make_shared(config, config_name, getDiskSelector(lock));
}
+
return shared->merge_tree_storage_policy_selector;
}
void Context::updateStorageConfiguration(const Poco::Util::AbstractConfiguration & config)
{
- std::lock_guard lock(shared->storage_policies_mutex);
+ {
+ std::lock_guard lock(shared->storage_policies_mutex);
- if (shared->merge_tree_disk_selector)
- shared->merge_tree_disk_selector
- = shared->merge_tree_disk_selector->updateFromConfig(config, "storage_configuration.disks", shared_from_this());
+ if (shared->merge_tree_disk_selector)
+ shared->merge_tree_disk_selector
+ = shared->merge_tree_disk_selector->updateFromConfig(config, "storage_configuration.disks", shared_from_this());
- if (shared->merge_tree_storage_policy_selector)
- {
- try
- {
- shared->merge_tree_storage_policy_selector = shared->merge_tree_storage_policy_selector->updateFromConfig(
- config, "storage_configuration.policies", shared->merge_tree_disk_selector);
- }
- catch (Exception & e)
+ if (shared->merge_tree_storage_policy_selector)
{
- LOG_ERROR(
- shared->log, "An error has occurred while reloading storage policies, storage policies were not applied: {}", e.message());
+ try
+ {
+ shared->merge_tree_storage_policy_selector = shared->merge_tree_storage_policy_selector->updateFromConfig(
+ config, "storage_configuration.policies", shared->merge_tree_disk_selector);
+ }
+ catch (Exception & e)
+ {
+ LOG_ERROR(
+ shared->log, "An error has occurred while reloading storage policies, storage policies were not applied: {}", e.message());
+ }
}
}
- if (shared->storage_s3_settings)
{
- shared->storage_s3_settings->loadFromConfig("s3", config, getSettingsRef());
+ std::lock_guard lock(shared->mutex);
+ if (shared->storage_s3_settings)
+ shared->storage_s3_settings->loadFromConfig("s3", config, getSettingsRef());
}
}
@@ -2623,13 +2765,14 @@ const MergeTreeSettings & Context::getMergeTreeSettings() const
const StreamSettings & Context::getStreamSettings() const
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (!shared->stream_settings)
{
+ const auto & config = shared->getConfigRefWithLock(lock);
StreamSettings settings;
/// Apply configured stream settings.
- settings.applyChanges(loadSettingChangesFromConfig("settings.stream", getConfigRef()));
+ settings.applyChanges(loadSettingChangesFromConfig("settings.stream", config));
shared->stream_settings.emplace(settings);
}
@@ -2638,17 +2781,19 @@ const StreamSettings & Context::getStreamSettings() const
void Context::applyGlobalSettingsFromConfig()
{
- settings.applyChanges(loadSettingChangesFromConfig("settings.global", getConfigRef()));
+ std::lock_guard lock(shared->mutex);
+ const auto & config = shared->getConfigRefWithLock(lock);
+ settings.applyChanges(loadSettingChangesFromConfig("settings.global", config));
}
/// proton: ends.
const StorageS3Settings & Context::getStorageS3Settings() const
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
if (!shared->storage_s3_settings)
{
- const auto & config = getConfigRef();
+ const auto & config = shared->getConfigRefWithLock(lock);
shared->storage_s3_settings.emplace().loadFromConfig("s3", config, getSettingsRef());
}
@@ -2743,7 +2888,7 @@ OutputFormatPtr Context::getOutputFormatParallelIfPossible(const String & name,
double Context::getUptimeSeconds() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
return shared->uptime_watch.elapsedSeconds();
}
@@ -2764,7 +2909,7 @@ void Context::reloadConfig() const
}
-void Context::shutdown()
+void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
{
// Disk selector might not be initialized if there was some error during
// its initialization. Don't try to initialize it again on shutdown.
@@ -2885,10 +3030,9 @@ const IHostContextPtr & Context::getHostContext() const
std::shared_ptr Context::getActionLocksManager()
{
- auto lock = getLock();
-
- if (!shared->action_locks_manager)
+ callOnce(shared->action_locks_manager_initialized, [&] {
shared->action_locks_manager = std::make_shared(shared_from_this());
+ });
return shared->action_locks_manager;
}
@@ -2966,7 +3110,7 @@ StorageID Context::resolveStorageID(StorageID storage_id, StorageNamespace where
StorageID resolved = StorageID::createEmpty();
std::optional exc;
{
- auto lock = getLock();
+ SharedLockGuard lock(mutex);
resolved = resolveStorageIDImpl(std::move(storage_id), where, &exc);
}
if (exc)
@@ -2983,7 +3127,7 @@ StorageID Context::tryResolveStorageID(StorageID storage_id, StorageNamespace wh
StorageID resolved = StorageID::createEmpty();
{
- auto lock = getLock();
+ SharedLockGuard lock(mutex);
resolved = resolveStorageIDImpl(std::move(storage_id), where, nullptr);
}
if (resolved && !resolved.hasUUID() && resolved.database_name != DatabaseCatalog::TEMPORARY_DATABASE)
@@ -3112,14 +3256,14 @@ MergeTreeTransactionPtr Context::getCurrentTransaction() const
bool Context::isServerCompletelyStarted() const
{
- auto lock = getLock();
+ SharedLockGuard lock(shared->mutex);
assert(getApplicationType() == ApplicationType::SERVER);
return shared->is_server_completely_started;
}
void Context::setServerCompletelyStarted()
{
- auto lock = getLock();
+ std::lock_guard lock(shared->mutex);
assert(global_context.lock().get() == this);
assert(!shared->is_server_completely_started);
assert(getApplicationType() == ApplicationType::SERVER);
@@ -3128,7 +3272,8 @@ void Context::setServerCompletelyStarted()
PartUUIDsPtr Context::getPartUUIDs() const
{
- auto lock = getLock();
+ std::lock_guard lock(mutex);
+
if (!part_uuids)
/// For context itself, only this initialization is not const.
/// We could have done in constructor.
@@ -3168,7 +3313,7 @@ void Context::setMergeTreeReadTaskCallback(MergeTreeReadTaskCallback && callback
PartUUIDsPtr Context::getIgnoredPartUUIDs() const
{
- auto lock = getLock();
+ std::lock_guard lock(mutex);
if (!ignored_part_uuids)
const_cast(ignored_part_uuids) = std::make_shared();
@@ -3192,7 +3337,8 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptrbackground_executors_mutex);
+
if (shared->is_background_executors_initialized)
return;
@@ -3249,21 +3395,25 @@ void Context::initializeBackgroundExecutorsIfNeeded()
MergeMutateBackgroundExecutorPtr Context::getMergeMutateExecutor() const
{
+ SharedLockGuard lock(shared->background_executors_mutex);
return shared->merge_mutate_executor;
}
OrdinaryBackgroundExecutorPtr Context::getMovesExecutor() const
{
+ SharedLockGuard lock(shared->background_executors_mutex);
return shared->moves_executor;
}
OrdinaryBackgroundExecutorPtr Context::getFetchesExecutor() const
{
+ SharedLockGuard lock(shared->background_executors_mutex);
return shared->fetch_executor;
}
OrdinaryBackgroundExecutorPtr Context::getCommonExecutor() const
{
+ SharedLockGuard lock(shared->background_executors_mutex);
return shared->common_executor;
}
@@ -3294,59 +3444,38 @@ size_t Context::getThreadPoolReaderSize(FilesystemReaderType type) const
IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const
{
- const auto & config = getConfigRef();
+ callOnce(shared->readers_initialized, [&] {
+ const auto & config = getConfigRef();
+ auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
+ auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
+ shared->asynchronous_remote_fs_reader = std::make_unique(pool_size, queue_size);
+
+ queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
+ shared->asynchronous_local_fs_reader = std::make_unique(pool_size, queue_size);
- auto lock = getLock();
+ shared->synchronous_local_fs_reader = std::make_unique();
+ });
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
- {
- if (!shared->asynchronous_remote_fs_reader)
- {
- auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
- auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
- shared->asynchronous_remote_fs_reader = std::make_unique(pool_size, queue_size);
- }
-
return *shared->asynchronous_remote_fs_reader;
- }
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
- {
- if (!shared->asynchronous_local_fs_reader)
- {
- auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
- auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
- shared->asynchronous_local_fs_reader = std::make_unique(pool_size, queue_size);
- }
-
return *shared->asynchronous_local_fs_reader;
- }
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
- {
- if (!shared->synchronous_local_fs_reader)
- {
- shared->synchronous_local_fs_reader = std::make_unique();
- }
-
return *shared->synchronous_local_fs_reader;
- }
}
}
ThreadPool & Context::getThreadPoolWriter() const
{
- const auto & config = getConfigRef();
-
- auto lock = getLock();
-
- if (!shared->threadpool_writer)
- {
+ callOnce(shared->threadpool_writer_initialized, [&] {
+ const auto & config = getConfigRef();
auto pool_size = config.getUInt(".threadpool_writer_pool_size", 100);
auto queue_size = config.getUInt(".threadpool_writer_queue_size", 1000000);
shared->threadpool_writer = std::make_unique(pool_size, pool_size, queue_size);
- }
+ });
return *shared->threadpool_writer;
}
@@ -3433,10 +3562,10 @@ bool Context::isDistributedEnv() const
ThreadPool & Context::getPartCommitPool() const
{
- auto lock = getLock();
- if (!shared->part_commit_pool)
+ callOnce(shared->part_commit_pool_initialized, [&] {
/// FIXME, queue size may matter
shared->part_commit_pool = std::make_unique(settings.part_commit_pool_size);
+ });
return *shared->part_commit_pool;
}
diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h
index f5ed4a0f728..409cdd1b792 100644
--- a/src/Interpreters/Context.h
+++ b/src/Interpreters/Context.h
@@ -16,6 +16,8 @@
#include
#include
#include
+#include
+#include
#include
#include
@@ -212,15 +214,20 @@ struct SharedContextHolder
std::unique_ptr shared;
};
-/** A set of known objects that can be used in the query.
- * Consists of a shared part (always common to all sessions and queries)
- * and copied part (which can be its own for each session or query).
- *
- * Everything is encapsulated for all sorts of checks and locks.
- */
-class Context: public std::enable_shared_from_this
+class ContextSharedMutex : public SharedMutexHelper
{
private:
+ using Base = SharedMutexHelper;
+ friend class SharedMutexHelper;
+
+ void lockImpl();
+
+ void lockSharedImpl();
+};
+
+class ContextData
+{
+protected:
ContextSharedPart * shared;
ClientInfo client_info;
@@ -405,7 +412,7 @@ class Context: public std::enable_shared_from_this
// Top-level OpenTelemetry trace context for the query. Makes sense only for a query context.
OpenTelemetryTraceContext query_trace_context;
-private:
+protected:
using SampleBlockCache = std::unordered_map;
mutable SampleBlockCache sample_block_cache;
@@ -438,13 +445,29 @@ class Context: public std::enable_shared_from_this
mutable DataStreamSemanticCache data_stream_semantic_cache;
/// proton: end.
+ /// Use copy constructor or createGlobal() instead
+ ContextData();
+ ContextData(const ContextData &);
+};
+
+/** A set of known objects that can be used in the query.
+ * Consists of a shared part (always common to all sessions and queries)
+ * and copied part (which can be its own for each session or query).
+ *
+ * Everything is encapsulated for all sorts of checks and locks.
+ */
+class Context: public ContextData, public std::enable_shared_from_this
+{
+private:
+ /// ContextData mutex
+ mutable ContextSharedMutex mutex;
+
Context();
Context(const Context &);
- Context & operator=(const Context &);
public:
/// Create initial Context with ContextShared and etc.
- static ContextMutablePtr createGlobal(ContextSharedPart * shared);
+ static ContextMutablePtr createGlobal(ContextSharedPart * shared_part);
static ContextMutablePtr createCopy(const ContextWeakPtr & other);
static ContextMutablePtr createCopy(const ContextMutablePtr & other);
static ContextMutablePtr createCopy(const ContextPtr & other);
@@ -719,12 +742,10 @@ class Context: public std::enable_shared_from_this
const ExternalModelsLoader & getExternalModelsLoader() const;
ExternalModelsLoader & getExternalModelsLoader();
- ExternalModelsLoader & getExternalModelsLoaderUnlocked();
void loadOrReloadModels(const Poco::Util::AbstractConfiguration & config);
const ExternalDictionariesLoader & getExternalDictionariesLoader() const;
ExternalDictionariesLoader & getExternalDictionariesLoader();
- ExternalDictionariesLoader & getExternalDictionariesLoaderUnlocked();
const EmbeddedDictionaries & getEmbeddedDictionaries() const;
EmbeddedDictionaries & getEmbeddedDictionaries();
void tryCreateEmbeddedDictionaries(const Poco::Util::AbstractConfiguration & config) const;
@@ -732,7 +753,6 @@ class Context: public std::enable_shared_from_this
const ExternalUserDefinedFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader() const;
ExternalUserDefinedFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader();
- ExternalUserDefinedFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoaderUnlocked();
const IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader() const;
IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader();
/// proton: starts
@@ -1090,12 +1110,42 @@ class Context: public std::enable_shared_from_this
WriteSettings getWriteSettings() const;
private:
- std::unique_lock getLock() const;
+ std::shared_ptr getSettingsConstraintsAndCurrentProfilesWithLock() const;
+
+ void setCurrentProfileWithLock(const String & profile_name, const std::lock_guard & lock);
+
+ void setCurrentProfileWithLock(const UUID & profile_id, const std::lock_guard & lock);
+
+ void setCurrentRolesWithLock(const std::vector & current_roles_, const std::lock_guard & lock);
+
+ void setSettingWithLock(std::string_view name, const String & value, const std::lock_guard & lock);
+
+ void setSettingWithLock(std::string_view name, const Field & value, const std::lock_guard & lock);
+
+ void applySettingChangeWithLock(const SettingChange & change, const std::lock_guard & lock);
+
+ void applySettingsChangesWithLock(const SettingsChanges & changes, const std::lock_guard & lock);
+
+ void setCurrentDatabaseWithLock(const String & name, const std::lock_guard & lock);
+
+ void checkSettingsConstraintsWithLock(const SettingChange & change) const;
+
+ void checkSettingsConstraintsWithLock(const SettingsChanges & changes) const;
+
+ void checkSettingsConstraintsWithLock(SettingsChanges & changes) const;
+
+ void clampToSettingsConstraintsWithLock(SettingsChanges & changes) const;
+
+ ExternalDictionariesLoader & getExternalDictionariesLoaderWithLock(const std::lock_guard & lock);
+
+ ExternalUserDefinedFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoaderWithLock(const std::lock_guard & lock);
+
+ ExternalModelsLoader & getExternalModelsLoaderWithLock(const std::lock_guard & lock);
void initGlobal();
/// Compute and set actual user settings, client_info.current_user should be set
- void calculateAccessRights();
+ void calculateAccessRightsWithLock(const std::lock_guard & lock);
template
void checkAccessImpl(const Args &... args) const;
@@ -1107,6 +1157,8 @@ class Context: public std::enable_shared_from_this
StoragePolicySelectorPtr getStoragePolicySelector(std::lock_guard & lock) const;
DiskSelectorPtr getDiskSelector(std::lock_guard & /* lock */) const;
+
+ DisksMap getDisksMap(std::lock_guard & lock) const;
};
}