Skip to content

Commit

Permalink
Merge branch 'main' into message_pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
sitaowang1998 committed Nov 30, 2024
2 parents 6fb2f0b + 326e2f9 commit 872616f
Show file tree
Hide file tree
Showing 22 changed files with 657 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/spider/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ IncludeCategories:
# Ex:
# - Regex: "<(fmt|spdlog)"
# Priority: 3
- Regex: "^<(|absl|boost|catch2|fmt|mariadb|msgpack|spdlog)"
- Regex: "^<(absl|boost|catch2|fmt|mariadb|msgpack|spdlog)"
Priority: 3
# C system headers
- Regex: "^<.+\\.h>"
Expand Down
40 changes: 40 additions & 0 deletions src/spider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,43 @@ target_link_libraries(
Boost::system
spdlog::spdlog
)

set(SPIDER_CLIENT_SHARED_SOURCES CACHE INTERNAL "spider client shared source files")

set(SPIDER_CLIENT_SHARED_HEADERS
client/Data.hpp
client/Driver.hpp
client/task.hpp
client/TaskContext.hpp
client/TaskGraph.hpp
client/type_utils.hpp
client/Exception.hpp
CACHE INTERNAL
"spider client shared header files"
)

add_library(spider_client_lib)
target_sources(spider_client_lib PRIVATE ${SPIDER_CLIENT_SHARED_SOURCES})
target_sources(spider_client_lib PUBLIC ${SPIDER_CLIENT_SHARED_HEADERS})
target_link_libraries(
spider_client_lib
PUBLIC
Boost::boost
absl::flat_hash_map
)

set(SPIDER_CLIENT_SOURCES CACHE INTERNAL "spider client source files")

set(SPIDER_CLIENT_HEADERS
client/spider.hpp
client/Job.hpp
CACHE INTERNAL
"spider client header files"
)

add_library(spider_client)
target_sources(spider_client PRIVATE ${SPIDER_CLIENT_SOURCES})
target_sources(spider_client PUBLIC ${SPIDER_CLIENT_HEADERS})
target_link_libraries(spider_client PRIVATE spider_core)
target_link_libraries(spider_client PUBLIC spider_client_lib)
add_library(spider::spider ALIAS spider_client)
82 changes: 82 additions & 0 deletions src/spider/client/Data.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#ifndef SPIDER_CLIENT_DATA_HPP
#define SPIDER_CLIENT_DATA_HPP

#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "../core/Serializer.hpp"

namespace spider {
class DataImpl;

/**
* A representation of data stored on external storage. This class allows the user to define:
* - how the data should be cleaned up (garbage collected) once it is no longer referenced.
* - the locality of the data.
*
* Example:
* @code{.cpp}
* auto disk_file_data = spider::Data<std::string>::Builder()
* .set_locality({"node_address"}, true)
* .set_cleanup_func([](std::string const& path) { std::filesystem::remove(path); })
* .build("/path/of/file");
* @endcode
*
* @tparam T Type of the value.
*/
template <Serializable T>
class Data {
public:
/**
* @return The stored value.
*/
auto get() -> T;

/**
* Sets the data's locality, indicated by the nodes that contain the data.
*
* @param nodes
* @param hard Whether the data is only accessible from the given nodes (i.e., the locality is a
* hard requirement).
*/
void set_locality(std::vector<std::string> const& nodes, bool hard);

class Builder {
public:
/**
* Sets the data's locality, indicated by the nodes that contain the data.
*
* @param nodes
* @param hard Whether the data is only accessible from the given nodes (i.e., the locality
* is a hard requirement.
* @return self
*/
auto set_locality(std::vector<std::string> const& nodes, bool hard) -> Builder&;

/**
* Sets the cleanup function for the data. This function will be called when the data is no
* longer referenced.
*
* @param f
* @return self
*/
auto set_cleanup_func(std::function<void(T const&)> const& f) -> Builder&;

/**
* Builds the data object.
*
* @param t Value of the data
* @return The built object.
* @throw spider::ConnectionException
*/
auto build(T const& t) -> Data;
};

private:
std::unique_ptr<DataImpl> m_impl;
};
} // namespace spider

#endif // SPIDER_CLIENT_DATA_HPP
139 changes: 139 additions & 0 deletions src/spider/client/Driver.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#ifndef SPIDER_CLIENT_DRIVER_HPP
#define SPIDER_CLIENT_DRIVER_HPP

#include <optional>
#include <string>
#include <vector>

#include <boost/uuid/uuid.hpp>

#include "../worker/FunctionManager.hpp"
#include "Job.hpp"
#include "task.hpp"
#include "TaskGraph.hpp"

/**
* Registers a Task function with Spider
* @param func
*/
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define SPIDER_REGISTER_TASK(func) SPIDER_WORKER_REGISTER_TASK(func)

/**
* Registers a timed Task function with Spider
* @param func
* @param timeout The time after which the task is considered a straggler, triggering Spider to
* replicate the task. TODO: Use the timeout.
*/
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define SPIDER_REGISTER_TASK_TIMEOUT(func, timeout) SPIDER_WORKER_REGISTER_TASK(func)

namespace spider {

/**
* An interface for a client to interact with Spider and create jobs, access the kv-store, etc.
*/
class Driver {
public:
/**
* @param storage_url
* @throw spider::ConnectionException
*/
explicit Driver(std::string const& storage_url);

/**
* @param storage_url
* @param id A caller-specified ID to associate with this driver. All jobs created by this
* driver will be associated with this ID. This may be useful if, for instance, the caller
* fails and then needs to reconnect and retrieve all previously created jobs. NOTE: It is
* undefined behaviour for two clients to concurrently use the same ID.
* @throw spider::ConnectionException
* @throw spider::DriverIdInUseException
*/
Driver(std::string const& storage_url, boost::uuids::uuid id);

/**
* Inserts the given key-value pair into the key-value store, overwriting any existing value.
*
* @param key
* @param value
* @throw spider::ConnectionException
*/
auto kv_store_insert(std::string const& key, std::string const& value);

/**
* Gets the value corresponding to the given key.
*
* NOTE: Callers cannot get values created by other clients, but they can get values created by
* previous `Driver` instances with the same client ID.
*
* @param key
* @return An optional containing the value if the given key exists, or `std::nullopt`
* otherwise.
* @throw spider::ConnectionException
*/
auto kv_store_get(std::string const& key) -> std::optional<std::string>;

/**
* Binds inputs to a task. Inputs can be:
* - the outputs of a task or task graph, forming dependencies between tasks.
* - any value that satisfies the `TaskIo` concept.
*
* @tparam ReturnType Return type for both the task and the resulting `TaskGraph`.
* @tparam TaskParams
* @tparam Inputs
* @tparam GraphParams
* @param task
* @param inputs Inputs to bind to `task`. If an input is a `Task` or `TaskGraph`, their
* outputs will be bound to the inputs of `task`.
* @return A `TaskGraph` of the inputs bound to `task`.
*/
template <
TaskIo ReturnType,
TaskIo... TaskParams,
RunnableOrTaskIo... Inputs,
TaskIo... GraphParams>
auto bind(TaskFunction<ReturnType, TaskParams...> const& task, Inputs&&... inputs)
-> TaskGraph<ReturnType(GraphParams...)>;

/**
* Starts running a task with the given inputs on Spider.
*
* @tparam ReturnType
* @tparam Params
* @param task
* @param inputs
* @return A job representing the running task.
* @throw spider::ConnectionException
*/
template <TaskIo ReturnType, TaskIo... Params>
auto
start(TaskFunction<ReturnType, Params...> const& task, Params&&... inputs) -> Job<ReturnType>;

/**
* Starts running a task graph with the given inputs on Spider.
*
* @tparam ReturnType
* @tparam Params
* @param graph
* @param inputs
* @return A job representing the running task graph.
* @throw spider::ConnectionException
*/
template <TaskIo ReturnType, TaskIo... Params>
auto
start(TaskGraph<ReturnType(Params...)> const& graph, Params&&... inputs) -> Job<ReturnType>;

/**
* Gets all scheduled and running jobs started by drivers with the current client's ID.
*
* NOTE: This method will not return jobs that have finished.
*
* @return IDs of the jobs.
* @throw spider::ConnectionException
*/
auto get_jobs() -> std::vector<boost::uuids::uuid>;
};
} // namespace spider

#endif // SPIDER_CLIENT_DRIVER_HPP
37 changes: 37 additions & 0 deletions src/spider/client/Exception.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#ifndef SPIDER_CLIENT_EXCEPTION_HPP
#define SPIDER_CLIENT_EXCEPTION_HPP

#include <exception>
#include <string>

#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <fmt/format.h>

namespace spider {
class ConnectionException final : public std::exception {
public:
explicit ConnectionException(std::string const& addr)
: m_message(fmt::format("Cannot connect to storage {}.", addr)) {}

[[nodiscard]] auto what() const noexcept -> char const* override { return m_message.c_str(); }

private:
std::string m_message;
};

class DriverIdInUseException final : public std::exception {
public:
explicit DriverIdInUseException(boost::uuids::uuid id)
: m_message(
fmt::format("Driver ID {} is currently in use.", boost::uuids::to_string(id))
) {}

[[nodiscard]] auto what() const noexcept -> char const* override { return m_message.c_str(); }

private:
std::string m_message;
};
} // namespace spider

#endif // SPIDER_CLIENT_EXCEPTION_HPP
72 changes: 72 additions & 0 deletions src/spider/client/Job.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#ifndef SPIDER_CLIENT_JOB_HPP
#define SPIDER_CLIENT_JOB_HPP

#include <cstdint>
#include <string>
#include <utility>

#include "task.hpp"

namespace spider {
// TODO: Use std::expected or Boost's outcome so that the user can get the result of the job in one
// call rather than the current error-prone approach which requires that the user check the job's
// status and then call the relevant method.

enum class JobStatus : uint8_t {
Running,
Succeeded,
Failed,
Cancelled,
};

/**
* A running task graph.
*
* @tparam ReturnType
*/
template <TaskIo ReturnType>
class Job {
public:
/**
* Waits for the job to complete.
*
* @throw spider::ConnectionException
*/
auto wait_complete();

/**
* Cancels the job and waits for the job's tasks to be cancelled.
*
* @throw spider::ConnectionException
*/
auto cancel();

/**
* @return Status of the job.
* @throw spider::ConnectionException
*/
auto get_status() -> JobStatus;

/**
* NOTE: It is undefined behavior to call this method for a job that is not in the `Succeeded`
* state.
*
* @return Result of the job.
* @throw spider::ConnectionException
*/
auto get_result() -> ReturnType;

/**
* NOTE: It is undefined behavior to call this method for a job that is not in the `Failed`
* state.
*
* @return A pair:
* - the name of the task function that failed.
* - the error message sent from the task through `TaskContext::abort` or from Spider.
* @throw spider::ConnectionException
*/
auto get_error() -> std::pair<std::string, std::string>;
};
} // namespace spider

#endif // SPIDER_CLIENT_JOB_HPP
Loading

0 comments on commit 872616f

Please sign in to comment.