Skip to content

Commit

Permalink
Merge branch 'main' into aws_signer
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 committed Aug 1, 2024
2 parents 15a17e9 + 9d2e94d commit a7d70ca
Show file tree
Hide file tree
Showing 45 changed files with 1,695 additions and 343 deletions.
6 changes: 2 additions & 4 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ tasks:
CHECKSUM_FILE: "{{.G_BUILD_DIR}}/{{.TASK}}.md5"
METEOR_ARCH: "{{ if eq ARCH \"arm64\" }}arm64{{ else }}x86_64{{ end }}"
METEOR_PLATFORM: "{{ if eq OS \"darwin\" }}osx{{ else }}linux{{ end }}"
METEOR_RELEASE: "2.15"
METEOR_RELEASE: "2.16"
run: "once"
preconditions:
- sh: >-
Expand Down Expand Up @@ -542,9 +542,7 @@ tasks:
internal: true
vars:
VERSIONED_PACKAGE_NAME:
sh: |
source /etc/os-release
echo "clp-{{.FLAVOUR}}-${ID}-${VERSION_CODENAME}-$(arch)-v{{.G_PACKAGE_VERSION}}"
sh: "echo clp-{{.FLAVOUR}}-$(arch)-v{{.G_PACKAGE_VERSION}}"
OUTPUT_DIR: "{{.G_BUILD_DIR}}/{{.VERSIONED_PACKAGE_NAME}}"
OUTPUT_FILE: "{{.OUTPUT_DIR}}.tar.gz"
requires:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ def append_docker_port_settings_for_host_ips(
cmd.append(f"{ip}:{host_port}:{container_port}")


def chown_recursively(
path: pathlib.Path,
user_id: int,
group_id: int,
):
"""
Recursively changes the owner of the given path to the given user ID and group ID.
:param path:
:param user_id:
:param group_id:
"""
chown_cmd = ["chown", "--recursive", f"{user_id}:{group_id}", str(path)]
subprocess.run(chown_cmd, stdout=subprocess.DEVNULL, check=True)


def wait_for_container_cmd(container_name: str, cmd_to_run: [str], timeout: int):
container_exec_cmd = ["docker", "exec", container_name]
cmd = container_exec_cmd + cmd_to_run
Expand Down Expand Up @@ -318,6 +333,19 @@ def start_queue(instance_id: str, clp_config: CLPConfig):
DockerMount(DockerMountType.BIND, queue_logs_dir, rabbitmq_logs_dir),
]
rabbitmq_pid_file_path = pathlib.Path("/") / "tmp" / "rabbitmq.pid"

host_user_id = os.getuid()
if 0 != host_user_id:
container_user = f"{host_user_id}:{os.getgid()}"
else:
# The host user is `root` so use the container's default user and make this component's
# directories writable by that user.
# NOTE: This doesn't affect the host user's access to the directories since they're `root`.
container_user = "rabbitmq"
default_container_user_id = 999
default_container_group_id = 999
chown_recursively(queue_logs_dir, default_container_user_id, default_container_group_id)

# fmt: off
cmd = [
"docker", "run",
Expand All @@ -327,7 +355,7 @@ def start_queue(instance_id: str, clp_config: CLPConfig):
# Override RABBITMQ_LOGS since the image sets it to *only* log to stdout
"-e", f"RABBITMQ_LOGS={rabbitmq_logs_dir / log_filename}",
"-e", f"RABBITMQ_PID_FILE={rabbitmq_pid_file_path}",
"-u", f"{os.getuid()}:{os.getgid()}",
"-u", container_user
]
# fmt: on
append_docker_port_settings_for_host_ips(
Expand Down Expand Up @@ -380,13 +408,27 @@ def start_redis(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path)
),
DockerMount(DockerMountType.BIND, redis_data_dir, pathlib.Path("/") / "data"),
]

host_user_id = os.getuid()
if 0 != host_user_id:
container_user = f"{host_user_id}:{os.getgid()}"
else:
# The host user is `root` so use the container's default user and make this component's
# directories writable by that user.
# NOTE: This doesn't affect the host user's access to the directories since they're `root`.
container_user = "redis"
default_container_user_id = 999
default_container_group_id = 999
chown_recursively(redis_data_dir, default_container_user_id, default_container_group_id)
chown_recursively(redis_logs_dir, default_container_user_id, default_container_group_id)

# fmt: off
cmd = [
"docker", "run",
"-d",
"--name", container_name,
"--log-driver", "local",
"-u", f"{os.getuid()}:{os.getgid()}",
"-u", container_user,
]
# fmt: on
for mount in mounts:
Expand All @@ -400,6 +442,19 @@ def start_redis(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path)
cmd.append(str(config_file_path))
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

# fmt: off
redis_ping_cmd = [
"redis-cli",
"-h", "127.0.0.1",
"-p", "6379",
"-a", clp_config.redis.password,
"PING"
]
# fmt: on

if not wait_for_container_cmd(container_name, redis_ping_cmd, 30):
raise EnvironmentError(f"{component_name} did not initialize in time")

logger.info(f"Started {component_name}.")


Expand All @@ -426,13 +481,27 @@ def start_results_cache(instance_id: str, clp_config: CLPConfig, conf_dir: pathl
DockerMount(DockerMountType.BIND, data_dir, pathlib.Path("/") / "data" / "db"),
DockerMount(DockerMountType.BIND, logs_dir, pathlib.Path("/") / "var" / "log" / "mongodb"),
]

host_user_id = os.getuid()
if 0 != host_user_id:
container_user = f"{host_user_id}:{os.getgid()}"
else:
# The host user is `root` so use the container's default user and make this component's
# directories writable by that user.
# NOTE: This doesn't affect the host user's access to the directories since they're `root`.
container_user = "mongodb"
default_container_user_id = 999
default_container_group_id = 999
chown_recursively(data_dir, default_container_user_id, default_container_group_id)
chown_recursively(logs_dir, default_container_user_id, default_container_group_id)

# fmt: off
cmd = [
"docker", "run",
"-d",
"--name", container_name,
"--log-driver", "local",
"-u", f"{os.getuid()}:{os.getgid()}",
"-u", container_user,
]
# fmt: on
for mount in mounts:
Expand Down Expand Up @@ -786,6 +855,9 @@ def start_webui(instance_id: str, clp_config: CLPConfig, mounts: CLPDockerMounts
},
"public": {
"ClpStorageEngine": clp_config.package.storage_engine,
"LogViewerWebuiUrl": (
f"http://{clp_config.log_viewer_webui.host}:{clp_config.log_viewer_webui.port}",
),
},
}
meteor_settings = read_and_update_settings_json(settings_json_path, meteor_settings_updates)
Expand Down
3 changes: 3 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ set(SOURCE_FILES_unitTest
src/clp/CurlDownloadHandler.cpp
src/clp/CurlDownloadHandler.hpp
src/clp/CurlEasyHandle.hpp
src/clp/CurlGlobalInstance.cpp
src/clp/CurlGlobalInstance.hpp
src/clp/CurlOperationFailed.hpp
src/clp/CurlStringList.hpp
src/clp/database_utils.cpp
Expand Down Expand Up @@ -355,6 +357,7 @@ set(SOURCE_FILES_unitTest
src/clp/hash_utils.cpp
src/clp/hash_utils.hpp
src/clp/ir/constants.hpp
src/clp/ir/EncodedTextAst.hpp
src/clp/ir/LogEvent.hpp
src/clp/ir/LogEventDeserializer.cpp
src/clp/ir/LogEventDeserializer.hpp
Expand Down
45 changes: 45 additions & 0 deletions components/core/src/clp/CurlGlobalInstance.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "CurlGlobalInstance.hpp"

#include <mutex>

#include <curl/curl.h>

#include "CurlOperationFailed.hpp"
#include "ErrorCode.hpp"

namespace clp {
CurlGlobalInstance::CurlGlobalInstance() {
std::scoped_lock const global_lock{m_ref_count_mutex};
if (0 == m_ref_count) {
if (auto const err{curl_global_init(CURL_GLOBAL_ALL)}; 0 != err) {
throw CurlOperationFailed(
ErrorCode_Failure,
__FILE__,
__LINE__,
err,
"`curl_global_init` failed."
);
}
}
++m_ref_count;
}

CurlGlobalInstance::~CurlGlobalInstance() {
std::scoped_lock const global_lock{m_ref_count_mutex};
--m_ref_count;
if (0 == m_ref_count) {
#if defined(__APPLE__)
// NOTE: On macOS, calling `curl_global_init` after `curl_global_cleanup` will fail with
// CURLE_SSL_CONNECT_ERROR. Thus, for now, we skip `deinit` on macOS. Luckily, it is safe to
// call `curl_global_init` multiple times without calling `curl_global_cleanup`. Related
// issues:
// - https://github.com/curl/curl/issues/12525
// - https://github.com/curl/curl/issues/13805
// TODO: Remove this conditional logic when the issues are resolved.
return;
#else
curl_global_cleanup();
#endif
}
}
} // namespace clp
35 changes: 35 additions & 0 deletions components/core/src/clp/CurlGlobalInstance.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#ifndef CLP_CURLGLOBALINSTANCE_HPP
#define CLP_CURLGLOBALINSTANCE_HPP

#include <cstddef>
#include <mutex>

namespace clp {
/**
* Class to wrap `libcurl`'s global initialization/de-initialization calls using RAII. Before using
* any `libcurl` functionalities, an instance of this class must be created. Although unnecessasry,
* it can be safely instantiated multiple times; it maintains a static reference count to all
* existing instances and only de-initializes `libcurl`'s global resources when the reference count
* reaches 0.
*/
class CurlGlobalInstance {
public:
// Constructors
CurlGlobalInstance();

// Disable copy/move constructors and assignment operators
CurlGlobalInstance(CurlGlobalInstance const&) = delete;
CurlGlobalInstance(CurlGlobalInstance&&) = delete;
auto operator=(CurlGlobalInstance const&) -> CurlGlobalInstance& = delete;
auto operator=(CurlGlobalInstance&&) -> CurlGlobalInstance& = delete;

// Destructor
~CurlGlobalInstance();

private:
static inline std::mutex m_ref_count_mutex;
static inline size_t m_ref_count{0};
};
} // namespace clp

#endif // CLP_CURLGLOBALINSTANCE_HPP
9 changes: 5 additions & 4 deletions components/core/src/clp/EncodedVariableInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary(
size_t& raw_num_bytes
) {
logtype_dict_entry.clear();
logtype_dict_entry.reserve_constant_length(log_event.get_logtype().length());
auto const& log_message = log_event.get_message();
logtype_dict_entry.reserve_constant_length(log_message.get_logtype().length());

raw_num_bytes = 0;

Expand Down Expand Up @@ -284,9 +285,9 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary(
};

ffi::ir_stream::generic_decode_message<false>(
log_event.get_logtype(),
log_event.get_encoded_vars(),
log_event.get_dict_vars(),
log_message.get_logtype(),
log_message.get_encoded_vars(),
log_message.get_dict_vars(),
constant_handler,
encoded_int_handler,
encoded_float_handler,
Expand Down
30 changes: 0 additions & 30 deletions components/core/src/clp/NetworkReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,33 +110,6 @@ curl_write_callback(char* ptr, size_t size, size_t nmemb, void* reader_ptr) -> s
}
} // namespace

bool NetworkReader::m_static_init_complete{false};

auto NetworkReader::init() -> ErrorCode {
if (m_static_init_complete) {
return ErrorCode_Success;
}
if (0 != curl_global_init(CURL_GLOBAL_ALL)) {
return ErrorCode_Failure;
}
m_static_init_complete = true;
return ErrorCode_Success;
}

auto NetworkReader::deinit() -> void {
#if defined(__APPLE__)
// NOTE: On macOS, calling `curl_global_init` after `curl_global_cleanup` will fail with
// CURLE_SSL_CONNECT_ERROR. Thus, for now, we skip `deinit` on macOS. Related issues:
// - https://github.com/curl/curl/issues/12525
// - https://github.com/curl/curl/issues/13805
// TODO: Remove this conditional logic when the issues are resolved.
return;
#else
curl_global_cleanup();
m_static_init_complete = false;
#endif
}

NetworkReader::NetworkReader(
std::string_view src_url,
size_t offset,
Expand All @@ -157,9 +130,6 @@ NetworkReader::NetworkReader(
// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays)
m_buffer_pool.emplace_back(std::make_unique<char[]>(m_buffer_size));
}
if (false == m_static_init_complete) {
throw OperationFailed(ErrorCode_NotReady, __FILE__, __LINE__);
}
m_downloader_thread = std::make_unique<DownloaderThread>(*this, offset, disable_caching);
m_downloader_thread->start();
}
Expand Down
33 changes: 15 additions & 18 deletions components/core/src/clp/NetworkReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <curl/curl.h>

#include "CurlDownloadHandler.hpp"
#include "CurlGlobalInstance.hpp"
#include "ErrorCode.hpp"
#include "ReaderInterface.hpp"
#include "Thread.hpp"
Expand Down Expand Up @@ -69,24 +70,20 @@ class NetworkReader : public ReaderInterface {
static constexpr size_t cMinBufferPoolSize{2};
static constexpr size_t cMinBufferSize{512};

/**
* Initializes static resources for this class. This must be called before using the class.
* @return ErrorCode_Success on success.
* @return ErrorCode_Failure if libcurl initialization failed.
*/
[[nodiscard]] static auto init() -> ErrorCode;

/**
* De-initializes any static resources.
*/
static auto deinit() -> void;

/**
* Constructs a reader to stream data from the given URL, starting at the given offset.
* TODO: the current implementation doesn't handle the case when the given offset is out of
* range. The file_pos will be set to an invalid state if this happens, which can be
* problematic if the other part of the program depends on this position. It can be fixed by
* capturing the error code 416 in the response header.
* NOTE: This class depends on `libcurl`, so an instance of `clp::CurlGlobalInstance` must
* remain alive for the entire lifespan of any instance of this class.
*
* This class maintains an instance of `CurlGlobalInstance` in case the user forgets to
* instantiate one, but it is better for performance if the user instantiates one. For instance,
* if the user doesn't instantiate a `CurlGlobalInstance` and only ever creates one
* `NetworkReader` at a time, then every construction and destruction of `NetworkReader` would
* cause `libcurl` to init and deinit. In contrast, if the user instantiates a
* `CurlGlobalInstance` before instantiating any `NetworkReader`s, then the `CurlGlobalInstance`
* maintained by this class will simply be a reference to the existing one rather than
* initializing and deinitializing `libcurl`.
*
* @param src_url
* @param offset Index of the byte at which to start the download
* @param disable_caching Whether to disable the caching.
Expand Down Expand Up @@ -246,8 +243,6 @@ class NetworkReader : public ReaderInterface {
bool m_disable_caching{false};
};

static bool m_static_init_complete;

/**
* Submits a request to abort the ongoing curl download session.
*/
Expand Down Expand Up @@ -308,6 +303,8 @@ class NetworkReader : public ReaderInterface {
return m_at_least_one_byte_downloaded.load();
}

CurlGlobalInstance m_curl_global_instance;

std::string m_src_url;

size_t m_offset{0};
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp/clg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ set(
../GlobalSQLiteMetadataDB.hpp
../Grep.cpp
../Grep.hpp
../ir/EncodedTextAst.hpp
../ir/LogEvent.hpp
../ir/parsing.cpp
../ir/parsing.hpp
Expand Down
Loading

0 comments on commit a7d70ca

Please sign in to comment.