diff --git a/doc/casync.rst b/doc/casync.rst index 00498f50..d923240f 100644 --- a/doc/casync.rst +++ b/doc/casync.rst @@ -164,6 +164,9 @@ General options: --cache= Directory to use as encoder cache --cache-auto, -c Pick encoder cache directory automatically --rate-limit-bps= Maximum bandwidth in bytes/s for remote communication +--max-active-chunks= Maximum number of simultaneously active chunks for remote communication +--max-host-connections= Maximum number of connections to a single host for remote communication +--ssl-trust-peer Trust the peer's SSL certificate --exclude-nodump=no Don't exclude files with chattr(1)'s +d **nodump** flag when creating archive --exclude-submounts=yes Exclude submounts when creating archive --exclude-file=no Don't respect .caexclude files in the file tree diff --git a/src/caremote.c b/src/caremote.c index 8ece8f76..f9fabc64 100644 --- a/src/caremote.c +++ b/src/caremote.c @@ -58,7 +58,11 @@ struct CaRemote { int input_fd; int output_fd; + int log_level; uint64_t rate_limit_bps; + unsigned max_active_chunks; + unsigned max_host_connections; + bool ssl_trust_peer; ReallocBuffer input_buffer; ReallocBuffer output_buffer; @@ -112,6 +116,7 @@ CaRemote* ca_remote_new(void) { rr->local_feature_flags = UINT64_MAX; rr->remote_feature_flags = UINT64_MAX; + rr->log_level = -1; rr->rate_limit_bps = UINT64_MAX; rr->digest_type = _CA_DIGEST_TYPE_INVALID; @@ -228,6 +233,15 @@ CaRemote* ca_remote_unref(CaRemote *rr) { return mfree(rr); } +int ca_remote_set_log_level(CaRemote *rr, int log_level) { + if (!rr) + return -EINVAL; + + rr->log_level = log_level; + + return 0; +} + int ca_remote_set_rate_limit_bps(CaRemote *rr, uint64_t rate_limit_bps) { if (!rr) return -EINVAL; @@ -237,6 +251,33 @@ int ca_remote_set_rate_limit_bps(CaRemote *rr, uint64_t rate_limit_bps) { return 0; } +int ca_remote_set_max_active_chunks(CaRemote *rr, unsigned max_active_chunks) { + if (!rr) + return -EINVAL; + + rr->max_active_chunks = max_active_chunks; + + return 0; +} + +int ca_remote_set_max_host_connections(CaRemote *rr, unsigned max_host_connections) { + if (!rr) + return -EINVAL; + + rr->max_host_connections = max_host_connections; + + return 0; +} + +int ca_remote_set_ssl_trust_peer(CaRemote *rr, bool ssl_trust_peer) { + if (!rr) + return -EINVAL; + + rr->ssl_trust_peer = ssl_trust_peer; + + return 0; +} + int ca_remote_set_local_feature_flags(CaRemote *rr, uint64_t flags) { if (!rr) return -EINVAL; @@ -984,9 +1025,21 @@ static int ca_remote_start(CaRemote *rr) { argc = (rr->callout ? 1 : 3) + 5 + strv_length(rr->rstore_urls); + if (rr->log_level != -1) + argc++; + if (rr->rate_limit_bps != UINT64_MAX) argc++; + if (rr->max_active_chunks) + argc++; + + if (rr->max_host_connections) + argc++; + + if (rr->ssl_trust_peer) + argc++; + args = newa(char*, argc + 1); if (rr->callout) { @@ -1016,6 +1069,14 @@ static int ca_remote_start(CaRemote *rr) { args[i++] = (char*) remote_casync; } + if (rr->log_level != -1) { + r = asprintf(args + i, "--log-level=%i", rr->log_level); + if (r < 0) + return log_oom(); + + i++; + } + if (rr->rate_limit_bps != UINT64_MAX) { r = asprintf(args + i, "--rate-limit-bps=%" PRIu64, rr->rate_limit_bps); if (r < 0) @@ -1024,6 +1085,25 @@ static int ca_remote_start(CaRemote *rr) { i++; } + if (rr->max_active_chunks) { + r = asprintf(args + i, "--max-active-chunks=%u", rr->max_active_chunks); + if (r < 0) + return log_oom(); + + i++; + } + + if (rr->max_host_connections) { + r = asprintf(args + i, "--max-host-connections=%u", rr->max_host_connections); + if (r < 0) + return log_oom(); + + i++; + } + + if (rr->ssl_trust_peer) + args[i++] = (char*) "--ssl-trust-peer"; + args[i + CA_REMOTE_ARG_OPERATION] = (char*) ((rr->local_feature_flags & (CA_PROTOCOL_PUSH_CHUNKS|CA_PROTOCOL_PUSH_INDEX|CA_PROTOCOL_PUSH_ARCHIVE)) ? "push" : "pull"); args[i + CA_REMOTE_ARG_BASE_URL] = /* rr->base_url ? rr->base_url + skip :*/ (char*) "-"; args[i + CA_REMOTE_ARG_ARCHIVE_URL] = rr->archive_url ? rr->archive_url + skip : (char*) "-"; diff --git a/src/caremote.h b/src/caremote.h index c5b1633f..0e0d2d39 100644 --- a/src/caremote.h +++ b/src/caremote.h @@ -50,7 +50,11 @@ int ca_remote_get_remote_feature_flags(CaRemote *rr, uint64_t* flags); int ca_remote_set_digest_type(CaRemote *rr, CaDigestType type); int ca_remote_get_digest_type(CaRemote *rr, CaDigestType *ret); +int ca_remote_set_log_level(CaRemote *rr, int log_level); int ca_remote_set_rate_limit_bps(CaRemote *rr, uint64_t rate_limit_bps); +int ca_remote_set_max_active_chunks(CaRemote *rr, unsigned max_active_chunks); +int ca_remote_set_max_host_connections(CaRemote *rr, unsigned max_max_connections); +int ca_remote_set_ssl_trust_peer(CaRemote *rr, bool ssl_trust_peer); int ca_remote_set_io_fds(CaRemote *rr, int input_fd, int output_fd); int ca_remote_get_io_fds(CaRemote *rr, int *ret_input_fd, int *ret_output_fd); diff --git a/src/casync-http.c b/src/casync-http.c index 43809c3a..75843a00 100644 --- a/src/casync-http.c +++ b/src/casync-http.c @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -10,19 +11,62 @@ #include "cautil.h" #include "realloc-buffer.h" #include "util.h" +#include "list.h" + +/* The maximum number of active chunks is defined as the sum of: + * - number of chunks added to curl multi for download + * - number of chunks downloaded, and waiting to be sent to remote + * + * In situations where the server is local and super fast (ie. we receive chunks + * faster than we can send them to the remote), around 95% of the active chunks + * are chunks waiting to be sent to remote, hence this number can be seen as + * "maximum number of chunks sitting in ram". + * + * In situations where the server is away, around 95% of the active chunks are + * chunks added to curl multi. It doesn't mean "being downloaded" though, it's more + * a "maximum limit for concurrent downloads". The real number of running downloads + * might be lower, because: + * - if we're doing HTTP/1 and parallel connections, the hard limit is actually + * defined by `MAX_HOST_CONNECTIONS`. + * - if we're doing HTTP/2 over a multiplexed connection, the number of parallel + * streams is negociated between client and server. + * + * In effect, *I think* it's best to make this number quite large, because we + * don't want to underfeed libcurl and underperform. I think it's better to feed + * too many handles to the curl multi, and let libcurl decide internally what's + * best to do with it. Libcurl knows every details about the HTTP connection and + * will handle (parallel/multiplex/whatever) downloads better than us. + */ +#define MAX_ACTIVE_CHUNKS 64 + +/* This is the maximum number of parallel connections per host. This should have + * no effect in case we're doing HTTP/2 with one connection and multiplexing. + * However, if we're doing HTTP/1, curl will open parallel connections, as HTTP/1 + * pipelining is no longer supported since libcurl 7.62. + * + * We want to make sure that we don't open too many parallel connections per host. + * It seems that the norm for web browsers ranges from 6 to 8. + */ +#define MAX_HOST_CONNECTIONS 8 static volatile sig_atomic_t quit = false; +static int arg_log_level = -1; static bool arg_verbose = false; static curl_off_t arg_rate_limit_bps = 0; +static unsigned arg_max_active_chunks = MAX_ACTIVE_CHUNKS; +static unsigned arg_max_host_connections = MAX_HOST_CONNECTIONS; +static bool arg_ssl_trust_peer = false; -static enum { - ARG_PROTOCOL_HTTP, - ARG_PROTOCOL_FTP, - ARG_PROTOCOL_HTTPS, - ARG_PROTOCOL_SFTP, - _ARG_PROTOCOL_INVALID = -1, -} arg_protocol = _ARG_PROTOCOL_INVALID; +typedef enum Protocol { + PROTOCOL_HTTP, + PROTOCOL_FTP, + PROTOCOL_HTTPS, + PROTOCOL_SFTP, + _PROTOCOL_INVALID = -1, +} Protocol; + +static Protocol arg_protocol = _PROTOCOL_INVALID; typedef enum ProcessUntil { PROCESS_UNTIL_WRITTEN, @@ -33,45 +77,833 @@ typedef enum ProcessUntil { PROCESS_UNTIL_FINISHED, } ProcessUntil; -static CURLcode robust_curl_easy_perform(CURL *curl) { - uint64_t sleep_base_usec = 100 * 1000; - unsigned trial = 1; - unsigned limit = 10; +/* + * protocol helpers + */ + +static const char *protocol_str(Protocol protocol) { + switch (protocol) { + case PROTOCOL_HTTP: + return "HTTP"; + case PROTOCOL_FTP: + return "FTP"; + case PROTOCOL_HTTPS: + return "HTTPS"; + case PROTOCOL_SFTP: + return "SFTP"; + default: + assert_not_reached("Unknown protocol"); + } +} + +static bool protocol_status_ok(Protocol protocol, long protocol_status) { + switch (protocol) { + case PROTOCOL_HTTP: + case PROTOCOL_HTTPS: + if (protocol_status == 200) + return true; + break; + case PROTOCOL_FTP: + if (protocol_status >= 200 && protocol_status <= 299) + return true; + break; + case PROTOCOL_SFTP: + if (protocol_status == 0) + return true; + break; + default: + assert_not_reached("Unknown protocol"); + break; + } + return false; +} + +/* + * curl helpers + */ + +DEFINE_TRIVIAL_CLEANUP_FUNC(CURL*, curl_easy_cleanup); +DEFINE_TRIVIAL_CLEANUP_FUNC(CURLM*, curl_multi_cleanup); + +#define log_error_curle(code, fmt, ...) \ + log_error_errno(-EIO, fmt ": %s", ##__VA_ARGS__, curl_easy_strerror(code)) + +#define log_error_curlm(code, fmt, ...) \ + log_error_errno(-EIO, fmt ": %s", ##__VA_ARGS__, curl_multi_strerror(code)) + +#define CURL_SETOPT_EASY(handle, option, value) \ + ({ \ + CURLcode _c; \ + _c = curl_easy_setopt(handle, option, (value)); \ + if (_c != CURLE_OK) \ + return log_error_curle(_c, "Failed to set " #option); \ + }) + +#define CURL_SETOPT_EASY_CANFAIL(handle, option, value) \ + ({ \ + CURLcode _c; \ + _c = curl_easy_setopt(handle, option, (value)); \ + if (_c != CURLE_OK) \ + log_error_curle(_c, "Failed to set " #option); \ + }) + +#define CURL_SETOPT_MULTI(handle, option, value) \ + ({ \ + CURLMcode _c; \ + _c = curl_multi_setopt(handle, option, (value)); \ + if (_c != CURLM_OK) \ + return log_error_curlm(_c, "Failed to set " #option); \ + }) + +#define CURL_SETOPT_MULTI_CANFAIL(handle, option, value) \ + ({ \ + CURLMcode _c; \ + _c = curl_multi_setopt(handle, option, (value)); \ + if (_c != CURLM_OK) \ + log_error_curlm(_c, "Failed to set " #option); \ + }) + +static inline const char *get_curl_effective_url(CURL *handle) { CURLcode c; + char *effective_url; + + c = curl_easy_getinfo(handle, CURLINFO_EFFECTIVE_URL, &effective_url); + if (c != CURLE_OK) { + log_error_curle(c, "Failed to get CURLINFO_EFFECTIVE_URL"); + return NULL; + } - assert(curl); + return effective_url; +} - while (trial < limit) { +static inline void *get_curl_private(CURL *handle) { + CURLcode c; + void *private; - c = curl_easy_perform(curl); + c = curl_easy_getinfo(handle, CURLINFO_PRIVATE, &private); + if (c != CURLE_OK) { + log_error_curle(c, "Failed to get CURLINFO_PRIVATE"); + return NULL; + } - switch (c) { + return private; +} + +static int configure_curl_easy_handle(CURL *handle, const char *url) { + assert(handle); + assert(url); + + CURL_SETOPT_EASY(handle, CURLOPT_URL, url); + + return 0; +} + +typedef size_t (*ca_curl_write_callback_t)(const void *, size_t, size_t, void *); + +static int make_curl_easy_handle(CURL **ret, + ca_curl_write_callback_t write_callback, + void *write_data, void *private) { + _cleanup_(curl_easy_cleanupp) CURL *h = NULL; + + assert(ret); + assert(write_callback); + assert(write_data); + /* private is optional and can be null */ + + h = curl_easy_init(); + if (!h) + return log_oom(); + + CURL_SETOPT_EASY(h, CURLOPT_FOLLOWLOCATION, 1L); + CURL_SETOPT_EASY(h, CURLOPT_PROTOCOLS, + arg_protocol == PROTOCOL_FTP ? CURLPROTO_FTP : + arg_protocol == PROTOCOL_SFTP ? CURLPROTO_SFTP : + CURLPROTO_HTTP | CURLPROTO_HTTPS); + + if (arg_protocol == PROTOCOL_SFTP) + CURL_SETOPT_EASY_CANFAIL(h, CURLOPT_SSH_AUTH_TYPES, CURLSSH_AUTH_AGENT); + + if (IN_SET(arg_protocol, PROTOCOL_HTTP, PROTOCOL_HTTPS)) { + /* Default since libcurl 7.62.0 */ + CURL_SETOPT_EASY_CANFAIL(h, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_0); + CURL_SETOPT_EASY_CANFAIL(h, CURLOPT_PIPEWAIT, 1l); + } + + if (arg_rate_limit_bps > 0) { + CURL_SETOPT_EASY(h, CURLOPT_MAX_SEND_SPEED_LARGE, arg_rate_limit_bps); + CURL_SETOPT_EASY(h, CURLOPT_MAX_RECV_SPEED_LARGE, arg_rate_limit_bps); + } + + CURL_SETOPT_EASY(h, CURLOPT_WRITEFUNCTION, write_callback); + CURL_SETOPT_EASY(h, CURLOPT_WRITEDATA, write_data); + + if (private) + CURL_SETOPT_EASY(h, CURLOPT_PRIVATE, private); + + if (arg_ssl_trust_peer) + CURL_SETOPT_EASY(h, CURLOPT_SSL_VERIFYPEER, false); + + CURL_SETOPT_EASY(h, CURLOPT_VERBOSE, arg_log_level > 4); + + *ret = TAKE_PTR(h); + return 0; +} + +static int make_curl_multi_handle(CURLM **ret) { + _cleanup_(curl_multi_cleanup) CURLM *h = NULL; + + assert(ret); + + h = curl_multi_init(); + if (!h) + return log_oom(); + + CURL_SETOPT_MULTI(h, CURLMOPT_MAX_HOST_CONNECTIONS, arg_max_host_connections); + + /* Default since libcurl 7.62.0 */ + CURL_SETOPT_MULTI_CANFAIL(h, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); + + *ret = TAKE_PTR(h); + return 0; +} + +/* + * chunks data + */ + +typedef struct ChunkData ChunkData; + +struct ChunkData { + size_t current_store; /* set to SIZE_MAX if chunk is missing */ + CaChunkID id; + ReallocBuffer buffer; +}; + +static void chunk_data_reset(ChunkData *cd, CaChunkID *id) { + assert(cd); + + cd->id = *id; + realloc_buffer_empty(&cd->buffer); +} + +static void chunk_data_free(ChunkData *cd) { + if (!cd) + return; + + realloc_buffer_free(&cd->buffer); + free(cd); +} + +static ChunkData *chunk_data_new(void) { + ChunkData *cd = NULL; + + cd = new0(ChunkData, 1); + + return cd; +} + +/* + * simple queue implementation + */ + +typedef struct QueueItem QueueItem; + +struct QueueItem { + void *data; + LIST_FIELDS(QueueItem, list); +}; - case CURLE_COULDNT_CONNECT: { - uint64_t sleep_usec; +typedef struct Queue { + LIST_HEAD(QueueItem, head); +} Queue; - /* Although this is not considered as a transient error by curl, - * this error can happen momentarily while casync is retrieving - * all the chunks from a remote. In this case we want to give - * a break to the server and retry later. - */ +static int queue_push(Queue *q, void *data) { + int r; + QueueItem *qi; + + assert(q); + assert(data); + + qi = new0(QueueItem, 1); + if (!qi) { + r = log_oom(); + return r; + } + + qi->data = data; + LIST_INIT(list, qi); + LIST_APPEND(list, q->head, qi); + + return 0; +} + +static void *queue_pop(Queue *q) { + QueueItem *qi; + void *data; + + assert(q); + + qi = q->head; + if (!qi) + return NULL; + + LIST_REMOVE(list, q->head, q->head); + data = qi->data; + free(qi); + + return data; +} + +static void *queue_remove(Queue *q, void *data) { + QueueItem *i, *n; - sleep_usec = sleep_base_usec * trial; - log_info("Could not connect, retrying in %" PRIu64 " ms", sleep_usec / 1000); - usleep(sleep_usec); - trial++; + assert(q); + + LIST_FOREACH_SAFE(list, i, n, q->head) { + if (i->data == data) break; - } + } + + if (!i) + return NULL; + + LIST_REMOVE(list, q->head, i); + free(i); + + return data; +} + +static bool queue_is_empty(Queue *q) { + assert(q); + + return LIST_IS_EMPTY(q->head); +} + +static void queue_free(Queue *q) { + QueueItem *i, *n; + + if (q == NULL) + return; + + LIST_FOREACH_SAFE(list, i, n, q->head) { + free(i); + } + + free(q); +} + +static Queue *queue_new(void) { + Queue *q; + + q = new0(Queue, 1); + if (!q) + return NULL; + + LIST_HEAD_INIT(q->head); + return q; +} + +/* + * Chunk Downloader + * + * We re-use things as much as possible, which means that: + * - CURL handles are allocated once at the beginning, then re-used all along. + * - ChunkData objects (ie. ReallocBuffer) as well. + * + * During operations, our CURL handles move from one queue to another, ie: + * ready -> inprogress -> completed -> ready ... + */ + +typedef struct CaChunkDownloader CaChunkDownloader; + +struct CaChunkDownloader { + CaRemote *remote; + CURLM *multi; + Queue *ready; /* CURL handles waiting to be used */ + Queue *inprogress; /* CURL handles in use (ie. added to curl multi) */ + Queue *completed; /* CURL handles completed (ie. chunks waiting to be put to remote */ + + char *store_url; +}; + +enum { + CA_CHUNK_DOWNLOADER_FINISHED, + CA_CHUNK_DOWNLOADER_POLL +}; + +static char *chunk_url(const char *store_url, const CaChunkID *id) { + char ids[CA_CHUNK_ID_FORMAT_MAX], *buffer; + const char *suffix; + size_t n; + + /* Chop off URL arguments and multiple trailing dashes, then append the chunk ID and ".cacnk" */ + + suffix = ca_compressed_chunk_suffix(); + + n = strcspn(store_url, "?;"); + while (n > 0 && store_url[n-1] == '/') + n--; + + buffer = new(char, n + 1 + 4 + 1 + CA_CHUNK_ID_FORMAT_MAX-1 + strlen(suffix) + 1); + if (!buffer) + return NULL; + + ca_chunk_id_format(id, ids); + + strcpy(mempcpy(mempcpy(mempcpy(mempcpy(mempcpy(buffer, store_url, n), "/", 1), ids, 4), "/", 1), ids, CA_CHUNK_ID_FORMAT_MAX-1), suffix); + + return buffer; +} + +static size_t write_chunk(const void *buffer, size_t size, size_t nmemb, void *userdata) { + ReallocBuffer *chunk_buffer = userdata; + size_t product, z; + + product = size * nmemb; + + z = realloc_buffer_size(chunk_buffer) + product; + if (z < realloc_buffer_size(chunk_buffer)) { + log_error("Overflow"); + return 0; + } + + if (z > CA_PROTOCOL_SIZE_MAX - offsetof(CaProtocolChunk, data)) { + log_error("Chunk too large"); + return 0; + } + + if (!realloc_buffer_append(chunk_buffer, buffer, product)) { + log_oom(); + return 0; + } + + return product; +} + +static void ca_chunk_downloader_free(CaChunkDownloader *dl) { + CURL *handle; + + if (dl == NULL) + return; + + while ((handle = queue_pop(dl->inprogress))) { + CURLMcode c; + + c = curl_multi_remove_handle(dl->multi, handle); + if (c != CURLM_OK) + log_error_curlm(c, "Failed to remove handle"); + + chunk_data_free(get_curl_private(handle)); + curl_easy_cleanup(handle); + } + + while ((handle = queue_pop(dl->ready))) { + chunk_data_free(get_curl_private(handle)); + curl_easy_cleanup(handle); + } + + while ((handle = queue_pop(dl->completed))) { + chunk_data_free(get_curl_private(handle)); + curl_easy_cleanup(handle); + } + + free(dl->store_url); + queue_free(dl->ready); + queue_free(dl->inprogress); + queue_free(dl->completed); + curl_multi_cleanup(dl->multi); + ca_remote_unref(dl->remote); + + free(dl); +} + +DEFINE_TRIVIAL_CLEANUP_FUNC(CaChunkDownloader*, ca_chunk_downloader_free); + +static CaChunkDownloader *ca_chunk_downloader_new(CaRemote *rr, const char *store_url) { + CaChunkDownloader *dl = NULL; + uint64_t i; + int r; + + dl = new0(CaChunkDownloader, 1); + if (!dl) + goto fail; + + dl->remote = ca_remote_ref(rr); + + r = make_curl_multi_handle(&dl->multi); + if (r < 0) + goto fail; + + dl->ready = queue_new(); + if (!dl->ready) + goto fail; + + dl->inprogress = queue_new(); + if (!dl->inprogress) + goto fail; + + dl->completed = queue_new(); + if (!dl->completed) + goto fail; + + for (i = 0; i < arg_max_active_chunks; i++) { + CURL *handle = NULL; + ChunkData *cd = NULL; + + cd = chunk_data_new(); + if (!cd) + goto fail; + + r = make_curl_easy_handle(&handle, write_chunk, &cd->buffer, cd); + if (r < 0) + goto fail; + + queue_push(dl->ready, handle); + } + + dl->store_url = strdup(store_url); + if (!dl->store_url) + goto fail; + + return dl; + +fail: + ca_chunk_downloader_free(dl); + return NULL; +} + +static int configure_handle_for_chunk(CURL *handle, const char *store_url, CaChunkID *id) { + int r; + ChunkData *cd = NULL; + _cleanup_free_ char *url_buffer = NULL; + + cd = get_curl_private(handle); + if (!cd) + return -EIO; + + chunk_data_reset(cd, id); + url_buffer = chunk_url(store_url, id); + if (!url_buffer) + return log_oom(); + + r = configure_curl_easy_handle(handle, url_buffer); + if (r < 0) + return r; + + return 0; +} + +/* Get chunk requests from remote, configure curl handles accordingly, + * add to curl multi, and return the number of chunk requests handled. */ +static int ca_chunk_downloader_fetch_chunk_requests(CaChunkDownloader *dl) { + QueueItem *i, *n; + int num = 0; + + LIST_FOREACH_SAFE(list, i, n, dl->ready->head) { + int r; + CURLMcode c; + CaChunkID id; + CURL *handle; + + r = ca_remote_has_pending_requests(dl->remote); + if (r < 0) + return log_error_errno(r, "Failed to query pending requests: %m"); + if (r == 0) + break; + + r = ca_remote_next_request(dl->remote, &id); + /* Even though we just ensured that there is a pending request, + * it's possible that next_requests() returns -ENODATA */ + if (r == -ENODATA) + return 0; + if (r == -EPIPE) + return r; + if (r < 0) + return log_error_errno(r, "Failed to query next request: %m"); + + handle = queue_pop(dl->ready); + assert(handle); + + r = configure_handle_for_chunk(handle, dl->store_url, &id); + if (r < 0) + return log_error_errno(r, "Failed to configure handle: %m"); + + log_debug("Acquiring chunk %s", get_curl_effective_url(handle)); + + c = curl_multi_add_handle(dl->multi, handle); + if (c != CURLM_OK) + return log_error_curlm(c, "Failed to add to multi handle"); + + queue_push(dl->inprogress, handle); + num++; + } + + return num; +} + +/* Do the communication with the remote, return a status code */ +static int ca_chunk_downloader_remote_step(CaChunkDownloader *dl) { + for (;;) { + int r; + + r = ca_remote_step(dl->remote); + if (r == -EPIPE) + return r; + if (r < 0) + return log_error_errno(r, "Failed to process remoting engine: %m"); + + switch (r) { + case CA_REMOTE_POLL: + return CA_CHUNK_DOWNLOADER_POLL; + case CA_REMOTE_FINISHED: + return CA_CHUNK_DOWNLOADER_FINISHED; + case CA_REMOTE_STEP: + case CA_REMOTE_REQUEST: + continue; default: - return c; + assert_not_reached("Unexpected step returned by remote_step()"); + break; + } + } + + assert_not_reached("Should have returned"); +} + +/* Put chunk requests to the remote, return the number of chunks put */ +static int ca_chunk_downloader_put_chunks(CaChunkDownloader *dl) { + int i; + + for (i = 0; ; i++) { + int r; + CURL *handle; + ChunkData *cd = NULL; + + if (queue_is_empty(dl->completed)) + break; + + r = ca_remote_can_put_chunk(dl->remote); + if (r == 0) + break; + if (r == -EPIPE) + return r; + if (r < 0) + return log_error_errno(r, "Failed to query can put chunk: %m"); + + handle = queue_pop(dl->completed); + assert(handle); + + cd = get_curl_private(handle); + if (!cd) + return -EIO; + + if (cd->current_store == SIZE_MAX) { + r = ca_remote_put_missing(dl->remote, &cd->id); + if (r < 0) + return log_error_errno(r, "Failed to write missing message: %m"); + } else { + r = ca_remote_put_chunk(dl->remote, &cd->id, CA_CHUNK_COMPRESSED, + realloc_buffer_data(&cd->buffer), + realloc_buffer_size(&cd->buffer)); + if (r < 0) + return log_error_errno(r, "Failed to write chunk: %m"); + } + + /* At this point, handle and chunk data are left "unconfigured" + * in the ready queue. They'll be reconfigured when re-used. */ + queue_push(dl->ready, handle); + } + + return i; +} + +/* Process chunks that were downloaded by curl, return the number of chunks handled */ +static int ca_chunk_downloader_process_curl_multi(CaChunkDownloader *dl) { + int i, n; + CURLMcode cm; + + cm = curl_multi_perform(dl->multi, &n); + if (cm != CURLM_OK) + return log_error_curlm(cm, "Failed to perform curl multi"); + + for (i = 0; ; i++) { + CURLcode c; + CURLMsg *msg; + CURL *handle; + long protocol_status; + const char *effective_url; + ChunkData *cd; + + msg = curl_multi_info_read(dl->multi, &n); + if (!msg) break; + + if (msg->msg != CURLMSG_DONE) { + log_error("Unexpected CURL message: %d", msg->msg); + return -EIO; + } + + if (msg->data.result != CURLE_OK) + return log_error_curle(msg->data.result, "Failed to acquire chunk"); + + handle = msg->easy_handle; + + effective_url = get_curl_effective_url(handle); + if (!effective_url) + return -EIO; + + cd = get_curl_private(handle); + if (!cd) + return -EIO; + + c = curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &protocol_status); + if (c != CURLE_OK) + return log_error_curle(c, "Failed to query response code"); + + if (!protocol_status_ok(arg_protocol, protocol_status)) { + log_error("%s server failure %ld while requesting %s", + protocol_str(arg_protocol), protocol_status, + effective_url); + + /* No more stores? Set current_store to a special value + * to indicate failure. */ + cd->current_store = SIZE_MAX; } + + cm = curl_multi_remove_handle(dl->multi, handle); + if (cm != CURLM_OK) + return log_error_curlm(cm, "Failed to remove curl handle"); + + queue_remove(dl->inprogress, handle); + queue_push(dl->completed, handle); } - return c; + return i; +} + +static int ca_chunk_downloader_step(CaChunkDownloader *dl) { + int r; + + /* Handle curl activity */ + r = ca_chunk_downloader_process_curl_multi(dl); + if (r < 0) + return log_error_errno(r, "Failed while processing curl multi: %m"); + if (r > 0) + log_trace("Processed %d curl messages", r); + + /* Step around */ + r = ca_chunk_downloader_remote_step(dl); + if (r == -EPIPE) + return r; + if (r < 0) + return log_error_errno(r, "Failed while processing remote engine: %m"); + if (r != CA_CHUNK_DOWNLOADER_POLL) + return r; + + /* Put as many downloaded chunks as we can */ + r = ca_chunk_downloader_put_chunks(dl); + if (r == -EPIPE) + return r; + if (r < 0) + return log_error_errno(r, "Failed while putting chunks to remote: %m"); + if (r > 0) + log_trace("Put %d chunks to remote", r); + + /* Get as many chunk requests as we can */ + r = ca_chunk_downloader_fetch_chunk_requests(dl); + if (r == -EPIPE) + return r; + if (r < 0) + return log_error_errno(r, "Failed while querying remote for chunk requests: %m"); + if (r > 0) + log_trace("Fetched %d chunk requests from remote", r); + + return CA_CHUNK_DOWNLOADER_POLL; +} + +static int get_remote_io_as_curl_waitfds(CaRemote *rr, struct curl_waitfd *ret_input, + struct curl_waitfd *ret_output) { + int r; + int input_fd, output_fd; + short input_poll_events, output_poll_events; + short input_curl_events, output_curl_events; + + assert(rr); + assert(ret_input); + assert(ret_output); + + r = ca_remote_get_io_fds(rr, &input_fd, &output_fd); + if (r < 0) + return r; + + r = ca_remote_get_io_events(rr, &input_poll_events, &output_poll_events); + if (r < 0) + return r; + + input_curl_events = input_poll_events & POLLIN ? CURL_WAIT_POLLIN : 0; + output_curl_events = output_poll_events & POLLOUT ? CURL_WAIT_POLLOUT : 0; + + *ret_input = (struct curl_waitfd) { + .fd = input_fd, + .events = input_curl_events, + }; + + *ret_output = (struct curl_waitfd) { + .fd = output_fd, + .events = output_curl_events, + + }; + + return 0; +} + +static int ca_chunk_downloader_wait(CaChunkDownloader *dl) { + int n, r; + CURLMcode c; + int curl_timeout_ms = INT_MAX; + struct curl_waitfd waitfds[2] = {}; + + r = get_remote_io_as_curl_waitfds(dl->remote, &waitfds[0], &waitfds[1]); + if (r < 0) + return log_error_errno(r, "Failed to get remote io: %m"); + + log_trace("SLEEP - handles: added=%" PRIu64 ", rem=%" PRIu64 " - chunks: put=%" PRIu64, + dl->inprogress->n_added, dl->inprogress->n_removed, dl->completed->n_removed); + + c = curl_multi_wait(dl->multi, waitfds, ELEMENTSOF(waitfds), curl_timeout_ms, &n); + if (c != CURLM_OK) + return log_error_curlm(c, "Failed to wait with curl multi"); + + log_trace("AWAKEN - %d event(s)", n); + + return 0; +} + +static int download_chunks(CaChunkDownloader *dl) { + for (;;) { + int r; + + if (quit) { + log_info("Got exit signal, quitting"); + return 0; + } + + r = ca_chunk_downloader_step(dl); + if (r < 0) + return r; + if (r == CA_CHUNK_DOWNLOADER_FINISHED) + return 0; + + r = ca_chunk_downloader_wait(dl); + if (r < 0) + return r; + } } +/* + * archive/index download + */ + static int process_remote(CaRemote *rr, ProcessUntil until) { int r; @@ -135,7 +967,7 @@ static int process_remote(CaRemote *rr, ProcessUntil until) { return r; if (r < 0) return log_error_errno(r, "Failed to determine whether there's more data to write."); - if (r > 0) + if (r == 0) return 0; break; @@ -180,10 +1012,14 @@ static size_t write_index(const void *buffer, size_t size, size_t nmemb, void *u r = ca_remote_put_index(rr, buffer, product); if (r < 0) { - log_error("Failed to put index: %m"); + log_error_errno(r, "Failed to put index: %m"); return 0; } + r = process_remote(rr, PROCESS_UNTIL_WRITTEN); + if (r < 0) + return r; + return product; } @@ -200,6 +1036,10 @@ static int write_index_eof(CaRemote *rr) { if (r < 0) return log_error_errno(r, "Failed to put index EOF: %m"); + r = process_remote(rr, PROCESS_UNTIL_WRITTEN); + if (r < 0) + return r; + return 0; } @@ -216,10 +1056,14 @@ static size_t write_archive(const void *buffer, size_t size, size_t nmemb, void r = ca_remote_put_archive(rr, buffer, product); if (r < 0) { - log_error("Failed to put archive: %m"); + log_error_errno(r, "Failed to put archive: %m"); return 0; } + r = process_remote(rr, PROCESS_UNTIL_WRITTEN); + if (r < 0) + return r; + return product; } @@ -236,133 +1080,49 @@ static int write_archive_eof(CaRemote *rr) { if (r < 0) return log_error_errno(r, "Failed to put archive EOF: %m"); - return 0; -} - -static size_t write_chunk(const void *buffer, size_t size, size_t nmemb, void *userdata) { - ReallocBuffer *chunk_buffer = userdata; - size_t product, z; - - product = size * nmemb; - - z = realloc_buffer_size(chunk_buffer) + product; - if (z < realloc_buffer_size(chunk_buffer)) { - log_error("Overflow"); - return 0; - } - - if (z > (CA_PROTOCOL_SIZE_MAX - offsetof(CaProtocolChunk, data))) { - log_error("Chunk too large"); - return 0; - } - - if (!realloc_buffer_append(chunk_buffer, buffer, product)) { - log_oom(); - return 0; - } - - return product; -} - -static char *chunk_url(const char *store_url, const CaChunkID *id) { - char ids[CA_CHUNK_ID_FORMAT_MAX], *buffer; - const char *suffix; - size_t n; - - /* Chop off URL arguments and multiple trailing dashes, then append the chunk ID and ".cacnk" */ - - suffix = ca_compressed_chunk_suffix(); - - n = strcspn(store_url, "?;"); - while (n > 0 && store_url[n-1] == '/') - n--; - - buffer = new(char, n + 1 + 4 + 1 + CA_CHUNK_ID_FORMAT_MAX-1 + strlen(suffix) + 1); - if (!buffer) - return NULL; - - ca_chunk_id_format(id, ids); - - strcpy(mempcpy(mempcpy(mempcpy(mempcpy(mempcpy(buffer, store_url, n), "/", 1), ids, 4), "/", 1), ids, CA_CHUNK_ID_FORMAT_MAX-1), suffix); + r = process_remote(rr, PROCESS_UNTIL_WRITTEN); + if (r < 0) + return r; - return buffer; + return 0; } -static int acquire_file(CaRemote *rr, - CURL *curl, - const char *url, - size_t (*callback)(const void *p, size_t size, size_t nmemb, void *userdata)) { - +static int acquire_file(CaRemote *rr, CURL *handle) { + CURLcode c; long protocol_status; + const char *url; - assert(curl); + url = get_curl_effective_url(handle); assert(url); - assert(callback); - - if (curl_easy_setopt(curl, CURLOPT_URL, url) != CURLE_OK) { - log_error("Failed to set CURL URL to: %s", url); - return -EIO; - } - - if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback) != CURLE_OK) { - log_error("Failed to set CURL callback function."); - return -EIO; - } - - if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, rr) != CURLE_OK) { - log_error("Failed to set CURL private data."); - return -EIO; - } log_debug("Acquiring %s...", url); - if (robust_curl_easy_perform(curl) != CURLE_OK) { - log_error("Failed to acquire %s", url); - return -EIO; - } - - if (curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &protocol_status) != CURLE_OK) { - log_error("Failed to query response code"); - return -EIO; - } - - if (IN_SET(arg_protocol, ARG_PROTOCOL_HTTP, ARG_PROTOCOL_HTTPS) && protocol_status != 200) { - char *m; - - if (arg_verbose) - log_error("HTTP server failure %li while requesting %s.", protocol_status, url); - - if (asprintf(&m, "HTTP request on %s failed with status %li", url, protocol_status) < 0) - return log_oom(); - - (void) ca_remote_abort(rr, protocol_status == 404 ? ENOMEDIUM : EBADR, m); - free(m); + c = curl_easy_perform(handle); + if (c != CURLE_OK) + return log_error_curle(c, "Failed to acquire %s", url); - return 0; + c = curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &protocol_status); + if (c != CURLE_OK) + return log_error_curle(c, "Failed to query response code"); - } else if (arg_protocol == ARG_PROTOCOL_FTP && (protocol_status < 200 || protocol_status > 299)) { - char *m; + if (!protocol_status_ok(arg_protocol, protocol_status)) { + _cleanup_free_ char *m = NULL; + int abort_code; if (arg_verbose) - log_error("FTP server failure %li while requesting %s.", protocol_status, url); + log_error("%s server failure %li while requesting %s", + protocol_str(arg_protocol), protocol_status, url); - if (asprintf(&m, "FTP request on %s failed with status %li", url, protocol_status) < 0) + if (asprintf(&m, "%s request on %s failed with status %li", + protocol_str(arg_protocol), url, protocol_status) < 0) return log_oom(); - (void) ca_remote_abort(rr, EBADR, m); - free(m); - return 0; - } else if (arg_protocol == ARG_PROTOCOL_SFTP && (protocol_status != 0)) { - char *m; - - if (arg_verbose) - log_error("SFTP server failure %li while requesting %s.", protocol_status, url); - - if (asprintf(&m, "SFTP request on %s failed with status %li", url, protocol_status) < 0) - return log_oom(); + if (IN_SET(arg_protocol, PROTOCOL_HTTP, PROTOCOL_HTTPS) && protocol_status == 404) + abort_code = ENOMEDIUM; + else + abort_code = EBADR; - (void) ca_remote_abort(rr, EBADR, m); - free(m); + (void) ca_remote_abort(rr, abort_code, m); return 0; } @@ -372,11 +1132,7 @@ static int acquire_file(CaRemote *rr, static int run(int argc, char *argv[]) { const char *base_url, *archive_url, *index_url, *wstore_url; size_t n_stores = 0, current_store = 0; - CURL *curl = NULL; _cleanup_(ca_remote_unrefp) CaRemote *rr = NULL; - _cleanup_(realloc_buffer_free) ReallocBuffer chunk_buffer = {}; - _cleanup_free_ char *url_buffer = NULL; - long protocol_status; int r; if (argc < _CA_REMOTE_ARG_MAX) { @@ -404,120 +1160,67 @@ static int run(int argc, char *argv[]) { } rr = ca_remote_new(); - if (!rr) { - r = log_oom(); - goto finish; - } + if (!rr) + return log_oom(); r = ca_remote_set_local_feature_flags(rr, (n_stores > 0 ? CA_PROTOCOL_READABLE_STORE : 0) | (index_url ? CA_PROTOCOL_READABLE_INDEX : 0) | (archive_url ? CA_PROTOCOL_READABLE_ARCHIVE : 0)); - if (r < 0) { - log_error("Failed to set feature flags: %m"); - goto finish; - } + if (r < 0) + return log_error_errno(r, "Failed to set feature flags: %m"); r = ca_remote_set_io_fds(rr, STDIN_FILENO, STDOUT_FILENO); - if (r < 0) { - log_error("Failed to set I/O file descriptors: %m"); - goto finish; - } - - curl = curl_easy_init(); - if (!curl) { - r = log_oom(); - goto finish; - } - - if (curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L) != CURLE_OK) { - log_error("Failed to turn on location following."); - r = -EIO; - goto finish; - } - - if (curl_easy_setopt(curl, CURLOPT_PROTOCOLS, arg_protocol == ARG_PROTOCOL_FTP ? CURLPROTO_FTP : - arg_protocol == ARG_PROTOCOL_SFTP? CURLPROTO_SFTP: CURLPROTO_HTTP|CURLPROTO_HTTPS) != CURLE_OK) { - log_error("Failed to limit protocols to HTTP/HTTPS/FTP/SFTP."); - r = -EIO; - goto finish; - } - - if (arg_protocol == ARG_PROTOCOL_SFTP) { - /* activate the ssh agent. For this to work you need - to have ssh-agent running (type set | grep SSH_AGENT to check) */ - if (curl_easy_setopt(curl, CURLOPT_SSH_AUTH_TYPES, CURLSSH_AUTH_AGENT) != CURLE_OK) - log_error("Failed to turn on ssh agent support, ignoring."); - } + if (r < 0) + return log_error_errno(r, "Failed to set I/O file descriptors: %m"); - if (arg_rate_limit_bps > 0) { - if (curl_easy_setopt(curl, CURLOPT_MAX_SEND_SPEED_LARGE, arg_rate_limit_bps) != CURLE_OK) { - log_error("Failed to set CURL send speed limit."); - r = -EIO; - goto finish; - } + if (archive_url) { + _cleanup_(curl_easy_cleanupp) CURL *handle = NULL; - if (curl_easy_setopt(curl, CURLOPT_MAX_RECV_SPEED_LARGE, arg_rate_limit_bps) != CURLE_OK) { - log_error("Failed to set CURL receive speed limit."); - r = -EIO; - goto finish; - } - } + r = make_curl_easy_handle(&handle, write_archive, rr, NULL); + if (r < 0) + return r; - /* (void) curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); */ + r = configure_curl_easy_handle(handle, archive_url); + if (r < 0) + return r; - if (archive_url) { - r = acquire_file(rr, curl, archive_url, write_archive); + r = acquire_file(rr, handle); if (r < 0) - goto finish; + return r; if (r == 0) goto flush; r = write_archive_eof(rr); if (r < 0) - goto finish; + return r; } if (index_url) { - r = acquire_file(rr, curl, index_url, write_index); + _cleanup_(curl_easy_cleanupp) CURL *handle = NULL; + + r = make_curl_easy_handle(&handle, write_index, rr, NULL); + if (r < 0) + return r; + + r = configure_curl_easy_handle(handle, index_url); + if (r < 0) + return r; + + r = acquire_file(rr, handle); if (r < 0) - goto finish; + return r; if (r == 0) goto flush; r = write_index_eof(rr); if (r < 0) - goto finish; + return r; } - for (;;) { + if (n_stores > 0) { + _cleanup_(ca_chunk_downloader_freep) CaChunkDownloader *dl = NULL; const char *store_url; - CaChunkID id; - - if (quit) { - log_info("Got exit signal, quitting."); - r = 0; - goto finish; - } - - if (n_stores == 0) /* No stores? Then we did all we could do */ - break; - - r = process_remote(rr, PROCESS_UNTIL_HAVE_REQUEST); - if (r == -EPIPE) { - r = 0; - goto finish; - } - if (r < 0) - goto finish; - - r = ca_remote_next_request(rr, &id); - if (r == -ENODATA) - continue; - if (r < 0) { - log_error_errno(r, "Failed to determine next chunk to get: %m"); - goto finish; - } current_store = current_store % n_stores; if (wstore_url) @@ -526,106 +1229,20 @@ static int run(int argc, char *argv[]) { store_url = argv[current_store + _CA_REMOTE_ARG_MAX]; /* current_store++; */ - free(url_buffer); - url_buffer = chunk_url(store_url, &id); - if (!url_buffer) { - r = log_oom(); - goto finish; - } - - if (curl_easy_setopt(curl, CURLOPT_URL, url_buffer) != CURLE_OK) { - log_error("Failed to set CURL URL to: %s", index_url); - r = -EIO; - goto finish; - } - - if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_chunk) != CURLE_OK) { - log_error("Failed to set CURL callback function."); - r = -EIO; - goto finish; - } - - if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, &chunk_buffer) != CURLE_OK) { - log_error("Failed to set CURL private data."); - r = -EIO; - goto finish; - } - - if (arg_rate_limit_bps > 0) { - if (curl_easy_setopt(curl, CURLOPT_MAX_SEND_SPEED_LARGE, arg_rate_limit_bps) != CURLE_OK) { - log_error("Failed to set CURL send speed limit."); - r = -EIO; - goto finish; - } - - if (curl_easy_setopt(curl, CURLOPT_MAX_RECV_SPEED_LARGE, arg_rate_limit_bps) != CURLE_OK) { - log_error("Failed to set CURL receive speed limit."); - r = -EIO; - goto finish; - } - } - - log_debug("Acquiring %s...", url_buffer); - - if (robust_curl_easy_perform(curl) != CURLE_OK) { - log_error("Failed to acquire %s", url_buffer); - r = -EIO; - goto finish; - } - - if (curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &protocol_status) != CURLE_OK) { - log_error("Failed to query response code"); - r = -EIO; - goto finish; - } - - r = process_remote(rr, PROCESS_UNTIL_CAN_PUT_CHUNK); - if (r == -EPIPE) { - r = 0; - goto finish; - } - if (r < 0) - goto finish; - - if ((IN_SET(arg_protocol, ARG_PROTOCOL_HTTP, ARG_PROTOCOL_HTTPS) && protocol_status == 200) || - (arg_protocol == ARG_PROTOCOL_FTP && (protocol_status >= 200 && protocol_status <= 299))|| - (arg_protocol == ARG_PROTOCOL_SFTP && (protocol_status == 0))) { - - r = ca_remote_put_chunk(rr, &id, CA_CHUNK_COMPRESSED, realloc_buffer_data(&chunk_buffer), realloc_buffer_size(&chunk_buffer)); - if (r < 0) { - log_error_errno(r, "Failed to write chunk: %m"); - goto finish; - } - - } else { - if (arg_verbose) - log_error("HTTP/FTP/SFTP server failure %li while requesting %s.", protocol_status, url_buffer); - - r = ca_remote_put_missing(rr, &id); - if (r < 0) { - log_error_errno(r, "Failed to write missing message: %m"); - goto finish; - } - } - - realloc_buffer_empty(&chunk_buffer); + dl = ca_chunk_downloader_new(rr, store_url); + if (!dl) + return log_oom(); - r = process_remote(rr, PROCESS_UNTIL_WRITTEN); - if (r == -EPIPE) { - r = 0; - goto finish; - } + r = download_chunks(dl); + if (r == -EPIPE) + return 0; if (r < 0) - goto finish; + return r; } flush: r = process_remote(rr, PROCESS_UNTIL_FINISHED); -finish: - if (curl) - curl_easy_cleanup(curl); - return r; } @@ -635,26 +1252,37 @@ static void help(void) { static int parse_argv(int argc, char *argv[]) { + enum { + ARG_RATE_LIMIT_BPS = 0x100, + ARG_MAX_ACTIVE_CHUNKS, + ARG_MAX_HOST_CONNECTIONS, + ARG_SSL_TRUST_PEER, + }; + static const struct option options[] = { { "help", no_argument, NULL, 'h' }, + { "log-level", required_argument, NULL, 'l' }, { "verbose", no_argument, NULL, 'v' }, - { "rate-limit-bps", required_argument, NULL, 'l' }, + { "rate-limit-bps", required_argument, NULL, ARG_RATE_LIMIT_BPS }, + { "max-active-chunks", required_argument, NULL, ARG_MAX_ACTIVE_CHUNKS }, + { "max-host-connections", required_argument, NULL, ARG_MAX_HOST_CONNECTIONS }, + { "ssl-trust-peer", no_argument, NULL, ARG_SSL_TRUST_PEER }, {} }; - int c; + int c, r; assert(argc >= 0); assert(argv); if (strstr(argv[0], "https")) - arg_protocol = ARG_PROTOCOL_HTTPS; + arg_protocol = PROTOCOL_HTTPS; else if (strstr(argv[0], "http")) - arg_protocol = ARG_PROTOCOL_HTTP; + arg_protocol = PROTOCOL_HTTP; else if (strstr(argv[0], "sftp")) - arg_protocol = ARG_PROTOCOL_SFTP; + arg_protocol = PROTOCOL_SFTP; else if (strstr(argv[0], "ftp")) - arg_protocol = ARG_PROTOCOL_FTP; + arg_protocol = PROTOCOL_FTP; else { log_error("Failed to determine set of protocols to use, refusing."); return -EINVAL; @@ -663,7 +1291,7 @@ static int parse_argv(int argc, char *argv[]) { if (getenv_bool("CASYNC_VERBOSE") > 0) arg_verbose = true; - while ((c = getopt_long(argc, argv, "hv", options, NULL)) >= 0) { + while ((c = getopt_long(argc, argv, "hl:v", options, NULL)) >= 0) { switch (c) { @@ -671,14 +1299,43 @@ static int parse_argv(int argc, char *argv[]) { help(); return 0; + case 'l': + r = set_log_level_from_string(optarg); + if (r < 0) + return log_error_errno(r, "Failed to parse log level \"%s\": %m", optarg); + + arg_log_level = r; + + break; + case 'v': arg_verbose = true; break; - case 'l': + case ARG_RATE_LIMIT_BPS: arg_rate_limit_bps = strtoll(optarg, NULL, 10); break; + case ARG_MAX_ACTIVE_CHUNKS: + r = safe_atou(optarg, &arg_max_active_chunks); + if (r < 0 || arg_max_active_chunks == 0) { + log_error("Invalid value for max-active-chunks, refusing"); + return -EINVAL; + } + break; + + case ARG_MAX_HOST_CONNECTIONS: + r = safe_atou(optarg, &arg_max_host_connections); + if (r < 0 || arg_max_host_connections == 0) { + log_error("Invalid value for max-host-connections, refusing"); + return -EINVAL; + } + break; + + case ARG_SSL_TRUST_PEER: + arg_ssl_trust_peer = true; + break; + case '?': return -EINVAL; diff --git a/src/casync-tool.c b/src/casync-tool.c index f9206886..b90d4a47 100644 --- a/src/casync-tool.c +++ b/src/casync-tool.c @@ -66,6 +66,9 @@ static size_t arg_chunk_size_min = 0; static size_t arg_chunk_size_avg = 0; static size_t arg_chunk_size_max = 0; static uint64_t arg_rate_limit_bps = UINT64_MAX; +static unsigned arg_max_active_chunks = 0; +static unsigned arg_max_host_connections = 0; +static bool arg_ssl_trust_peer = false; static uint64_t arg_with = 0; static uint64_t arg_without = 0; static uid_t arg_uid_shift = 0, arg_uid_range = 0x10000U; @@ -107,6 +110,12 @@ static void help(void) { " -c --cache-auto Pick encoder cache directory automatically\n" " --rate-limit-bps=LIMIT Maximum bandwidth in bytes/s for remote\n" " communication\n" + " --max-active-chunks=MAX Maximum number of simultaneously active chunks for\n" + " remote communication\n" + " --max-host-connections=MAX\n" + " Maximum number of connections to a single host for\n" + " remote communication\n" + " --ssl-trust-peer Trust the peer's SSL certificate\n" " --exclude-nodump=no Don't exclude files with chattr(1)'s +d 'nodump'\n" " flag when creating archive\n" " --exclude-submounts=yes Exclude submounts when creating archive\n" @@ -328,6 +337,9 @@ static int parse_argv(int argc, char *argv[]) { ARG_SEED, ARG_CACHE, ARG_RATE_LIMIT_BPS, + ARG_MAX_ACTIVE_CHUNKS, + ARG_MAX_HOST_CONNECTIONS, + ARG_SSL_TRUST_PEER, ARG_WITH, ARG_WITHOUT, ARG_WHAT, @@ -362,6 +374,9 @@ static int parse_argv(int argc, char *argv[]) { { "cache", required_argument, NULL, ARG_CACHE }, { "cache-auto", no_argument, NULL, 'c' }, { "rate-limit-bps", required_argument, NULL, ARG_RATE_LIMIT_BPS }, + { "max-active-chunks", required_argument, NULL, ARG_MAX_ACTIVE_CHUNKS }, + { "max-host-connections", required_argument, NULL, ARG_MAX_HOST_CONNECTIONS }, + { "ssl-trust-peer", no_argument, NULL, ARG_SSL_TRUST_PEER }, { "with", required_argument, NULL, ARG_WITH }, { "without", required_argument, NULL, ARG_WITHOUT }, { "what", required_argument, NULL, ARG_WHAT }, @@ -475,6 +490,26 @@ static int parse_argv(int argc, char *argv[]) { break; + case ARG_MAX_ACTIVE_CHUNKS: + r = safe_atou(optarg, &arg_max_active_chunks); + if (r < 0) { + log_error("Failed to parse --max-active-chunks= value %s", optarg); + return -EINVAL; + } + break; + + case ARG_MAX_HOST_CONNECTIONS: + r = safe_atou(optarg, &arg_max_host_connections); + if (r < 0) { + log_error("Failed to parse --max-host-connections= value %s", optarg); + return -EINVAL; + } + break; + + case ARG_SSL_TRUST_PEER: + arg_ssl_trust_peer = true; + break; + case ARG_WITH: { uint64_t u; @@ -1318,12 +1353,36 @@ static int verb_make(int argc, char *argv[]) { if (r < 0) return r; + if (arg_log_level != -1) { + r = ca_sync_set_log_level(s, arg_log_level); + if (r < 0) + return log_error_errno(r, "Failed to set log level: %m"); + } + if (arg_rate_limit_bps != UINT64_MAX) { r = ca_sync_set_rate_limit_bps(s, arg_rate_limit_bps); if (r < 0) return log_error_errno(r, "Failed to set rate limit: %m"); } + if (arg_max_active_chunks) { + r = ca_sync_set_max_active_chunks(s, arg_max_active_chunks); + if (r < 0) + return log_error_errno(r, "Failed to set max active chunks: %m"); + } + + if (arg_max_host_connections) { + r = ca_sync_set_max_host_connections(s, arg_max_host_connections); + if (r < 0) + return log_error_errno(r, "Failed to set max host connections: %m"); + } + + if (arg_ssl_trust_peer) { + r = ca_sync_set_ssl_trust_peer(s, arg_ssl_trust_peer); + if (r < 0) + return log_error_errno(r, "Failed to set SSL trust peer: %m"); + } + r = ca_sync_set_base_fd(s, input_fd); if (r < 0) return log_error_errno(r, "Failed to set sync base: %m"); @@ -1617,12 +1676,36 @@ static int verb_extract(int argc, char *argv[]) { } } + if (arg_log_level != -1) { + r = ca_sync_set_log_level(s, arg_log_level); + if (r < 0) + return log_error_errno(r, "Failed to set log level: %m"); + } + if (arg_rate_limit_bps != UINT64_MAX) { r = ca_sync_set_rate_limit_bps(s, arg_rate_limit_bps); if (r < 0) return log_error_errno(r, "Failed to set rate limit: %m"); } + if (arg_max_active_chunks) { + r = ca_sync_set_max_active_chunks(s, arg_max_active_chunks); + if (r < 0) + return log_error_errno(r, "Failed to set max active chunks: %m"); + } + + if (arg_max_host_connections) { + r = ca_sync_set_max_host_connections(s, arg_max_host_connections); + if (r < 0) + return log_error_errno(r, "Failed to set max host connections: %m"); + } + + if (arg_ssl_trust_peer) { + r = ca_sync_set_ssl_trust_peer(s, arg_ssl_trust_peer); + if (r < 0) + return log_error_errno(r, "Failed to set SSL trust peer: %m"); + } + if (seek_path) { if (output_fd >= 0) r = ca_sync_set_boundary_fd(s, output_fd); @@ -2772,12 +2855,36 @@ static int verb_mount(int argc, char *argv[]) { return r; } + if (arg_log_level != -1) { + r = ca_sync_set_log_level(s, arg_log_level); + if (r < 0) + return log_error_errno(r, "Failed to set log level: %m"); + } + if (arg_rate_limit_bps != UINT64_MAX) { r = ca_sync_set_rate_limit_bps(s, arg_rate_limit_bps); if (r < 0) return log_error_errno(r, "Failed to set rate limit: %m"); } + if (arg_max_active_chunks) { + r = ca_sync_set_max_active_chunks(s, arg_max_active_chunks); + if (r < 0) + return log_error_errno(r, "Failed to set max active chunks: %m"); + } + + if (arg_max_host_connections) { + r = ca_sync_set_max_host_connections(s, arg_max_host_connections); + if (r < 0) + return log_error_errno(r, "Failed to set max host connections: %m"); + } + + if (arg_ssl_trust_peer) { + r = ca_sync_set_ssl_trust_peer(s, arg_ssl_trust_peer); + if (r < 0) + return log_error_errno(r, "Failed to set SSL trust peer: %m"); + } + if (operation == MOUNT_ARCHIVE) { if (input_fd >= 0) r = ca_sync_set_archive_fd(s, input_fd); @@ -2892,12 +2999,36 @@ static int verb_mkdev(int argc, char *argv[]) { goto finish; } + if (arg_log_level != -1) { + r = ca_sync_set_log_level(s, arg_log_level); + if (r < 0) + return log_error_errno(r, "Failed to set log level: %m"); + } + if (arg_rate_limit_bps != UINT64_MAX) { r = ca_sync_set_rate_limit_bps(s, arg_rate_limit_bps); if (r < 0) return log_error_errno(r, "Failed to set rate limit: %m"); } + if (arg_max_active_chunks) { + r = ca_sync_set_max_active_chunks(s, arg_max_active_chunks); + if (r < 0) + return log_error_errno(r, "Failed to set max active chunks: %m"); + } + + if (arg_max_host_connections) { + r = ca_sync_set_max_host_connections(s, arg_max_host_connections); + if (r < 0) + return log_error_errno(r, "Failed to set max host connections: %m"); + } + + if (arg_ssl_trust_peer) { + r = ca_sync_set_ssl_trust_peer(s, arg_ssl_trust_peer); + if (r < 0) + return log_error_errno(r, "Failed to set SSL trust peer: %m"); + } + if (operation == MKDEV_BLOB) { if (input_fd >= 0) r = ca_sync_set_archive_fd(s, input_fd); @@ -3453,12 +3584,36 @@ static int verb_pull(int argc, char *argv[]) { if (r < 0) return log_error_errno(r, "Failed to set feature flags: %m"); + if (arg_log_level != -1) { + r = ca_remote_set_log_level(rr, arg_log_level); + if (r < 0) + return log_error_errno(r, "Failed to set log level: %m"); + } + if (arg_rate_limit_bps != UINT64_MAX) { r = ca_remote_set_rate_limit_bps(rr, arg_rate_limit_bps); if (r < 0) return log_error_errno(r, "Failed to set rate limit: %m"); } + if (arg_max_active_chunks) { + r = ca_remote_set_max_active_chunks(rr, arg_max_active_chunks); + if (r < 0) + return log_error_errno(r, "Failed to set max active chunks: %m"); + } + + if (arg_max_host_connections) { + r = ca_remote_set_max_host_connections(rr, arg_max_host_connections); + if (r < 0) + return log_error_errno(r, "Failed to set max host connections: %m"); + } + + if (arg_ssl_trust_peer) { + r = ca_remote_set_ssl_trust_peer(rr, arg_ssl_trust_peer); + if (r < 0) + return log_error_errno(r, "Failed to set SSL trust peer: %m"); + } + r = ca_remote_set_io_fds(rr, STDIN_FILENO, STDOUT_FILENO); if (r < 0) return log_error_errno(r, "Failed to set I/O file descriptors: %m"); @@ -3606,12 +3761,36 @@ static int verb_push(int argc, char *argv[]) { if (r < 0) log_error_errno(r, "Failed to set feature flags: %m"); + if (arg_log_level != -1) { + r = ca_remote_set_log_level(rr, arg_log_level); + if (r < 0) + return log_error_errno(r, "Failed to set log level: %m"); + } + if (arg_rate_limit_bps != UINT64_MAX) { r = ca_remote_set_rate_limit_bps(rr, arg_rate_limit_bps); if (r < 0) log_error_errno(r, "Failed to set rate limit: %m"); } + if (arg_max_active_chunks) { + r = ca_remote_set_max_active_chunks(rr, arg_max_active_chunks); + if (r < 0) + return log_error_errno(r, "Failed to set max active chunks: %m"); + } + + if (arg_max_host_connections) { + r = ca_remote_set_max_host_connections(rr, arg_max_host_connections); + if (r < 0) + return log_error_errno(r, "Failed to set max host connections: %m"); + } + + if (arg_ssl_trust_peer) { + r = ca_remote_set_ssl_trust_peer(rr, arg_ssl_trust_peer); + if (r < 0) + return log_error_errno(r, "Failed to set SSL trust peer: %m"); + } + r = ca_remote_set_io_fds(rr, STDIN_FILENO, STDOUT_FILENO); if (r < 0) log_error_errno(r, "Failed to set I/O file descriptors: %m"); diff --git a/src/casync.c b/src/casync.c index 9fee77f7..0994d4f9 100644 --- a/src/casync.c +++ b/src/casync.c @@ -110,7 +110,11 @@ struct CaSync { bool archive_eof; bool remote_index_eof; + int log_level; size_t rate_limit_bps; + unsigned max_active_chunks; + unsigned max_host_connections; + bool ssl_trust_peer; uint64_t feature_flags; uint64_t feature_flags_mask; @@ -169,6 +173,7 @@ static CaSync *ca_sync_new(void) { s->chunker = (CaChunker) CA_CHUNKER_INIT; + s->log_level = -1; s->archive_size = UINT64_MAX; s->punch_holes = true; s->reflink = true; @@ -511,6 +516,42 @@ CaSync *ca_sync_unref(CaSync *s) { return mfree(s); } +int ca_sync_set_log_level(CaSync *s, int log_level) { + if (!s) + return -EINVAL; + + s->log_level = log_level; + + return 0; +} + +int ca_sync_set_max_active_chunks(CaSync *s, unsigned max_active_chunks) { + if (!s) + return -EINVAL; + + s->max_active_chunks = max_active_chunks; + + return 0; +} + +int ca_sync_set_max_host_connections(CaSync *s, unsigned max_host_connections) { + if (!s) + return -EINVAL; + + s->max_host_connections = max_host_connections; + + return 0; +} + +int ca_sync_set_ssl_trust_peer(CaSync *s, bool ssl_trust_peer) { + if (!s) + return -EINVAL; + + s->ssl_trust_peer = ssl_trust_peer; + + return 0; +} + int ca_sync_set_rate_limit_bps(CaSync *s, uint64_t rate_limit_bps) { if (!s) return -EINVAL; @@ -671,12 +712,30 @@ int ca_sync_set_index_remote(CaSync *s, const char *url) { if (!s->remote_index) return -ENOMEM; + if (s->log_level != -1) { + r = ca_remote_set_log_level(s->remote_index, s->log_level); + if (r < 0) + return r; + } + if (s->rate_limit_bps > 0) { r = ca_remote_set_rate_limit_bps(s->remote_index, s->rate_limit_bps); if (r < 0) return r; } + if (s->max_active_chunks > 0) { + r = ca_remote_set_max_active_chunks(s->remote_index, s->max_active_chunks); + if (r < 0) + return r; + } + + if (s->max_host_connections > 0) { + r = ca_remote_set_max_host_connections(s->remote_index, s->max_host_connections); + if (r < 0) + return r; + } + r = ca_remote_set_index_url(s->remote_index, url); if (r < 0) return r; diff --git a/src/casync.h b/src/casync.h index 982f67e5..073aa5e6 100644 --- a/src/casync.h +++ b/src/casync.h @@ -31,7 +31,11 @@ CaSync *ca_sync_new_decode(void); CaSync *ca_sync_unref(CaSync *sync); DEFINE_TRIVIAL_CLEANUP_FUNC(CaSync *, ca_sync_unref); +int ca_sync_set_log_level(CaSync *s, int log_level); int ca_sync_set_rate_limit_bps(CaSync *s, uint64_t rate_limit_bps); +int ca_sync_set_max_active_chunks(CaSync *s, unsigned max_active_chunks); +int ca_sync_set_max_host_connections(CaSync *s, unsigned max_host_connection); +int ca_sync_set_ssl_trust_peer(CaSync *s, bool ssl_trust_peer); int ca_sync_set_feature_flags(CaSync *s, uint64_t flags); int ca_sync_get_feature_flags(CaSync *s, uint64_t *ret); diff --git a/src/list.h b/src/list.h new file mode 100644 index 00000000..7231e4b0 --- /dev/null +++ b/src/list.h @@ -0,0 +1,169 @@ +/* SPDX-License-Identifier: LGPL-2.1+ */ +#pragma once + +/* The head of the linked list. Use this in the structure that shall + * contain the head of the linked list */ +#define LIST_HEAD(t,name) \ + t *name + +/* The pointers in the linked list's items. Use this in the item structure */ +#define LIST_FIELDS(t,name) \ + t *name##_next, *name##_prev + +/* Initialize the list's head */ +#define LIST_HEAD_INIT(head) \ + do { \ + (head) = NULL; \ + } while (false) + +/* Initialize a list item */ +#define LIST_INIT(name,item) \ + do { \ + typeof(*(item)) *_item = (item); \ + assert(_item); \ + _item->name##_prev = _item->name##_next = NULL; \ + } while (false) + +/* Prepend an item to the list */ +#define LIST_PREPEND(name,head,item) \ + do { \ + typeof(*(head)) **_head = &(head), *_item = (item); \ + assert(_item); \ + if ((_item->name##_next = *_head)) \ + _item->name##_next->name##_prev = _item; \ + _item->name##_prev = NULL; \ + *_head = _item; \ + } while (false) + +/* Append an item to the list */ +#define LIST_APPEND(name,head,item) \ + do { \ + typeof(*(head)) **_hhead = &(head), *_tail; \ + LIST_FIND_TAIL(name, *_hhead, _tail); \ + LIST_INSERT_AFTER(name, *_hhead, _tail, item); \ + } while (false) + +/* Remove an item from the list */ +#define LIST_REMOVE(name,head,item) \ + do { \ + typeof(*(head)) **_head = &(head), *_item = (item); \ + assert(_item); \ + if (_item->name##_next) \ + _item->name##_next->name##_prev = _item->name##_prev; \ + if (_item->name##_prev) \ + _item->name##_prev->name##_next = _item->name##_next; \ + else { \ + assert(*_head == _item); \ + *_head = _item->name##_next; \ + } \ + _item->name##_next = _item->name##_prev = NULL; \ + } while (false) + +/* Find the head of the list */ +#define LIST_FIND_HEAD(name,item,head) \ + do { \ + typeof(*(item)) *_item = (item); \ + if (!_item) \ + (head) = NULL; \ + else { \ + while (_item->name##_prev) \ + _item = _item->name##_prev; \ + (head) = _item; \ + } \ + } while (false) + +/* Find the tail of the list */ +#define LIST_FIND_TAIL(name,item,tail) \ + do { \ + typeof(*(item)) *_item = (item); \ + if (!_item) \ + (tail) = NULL; \ + else { \ + while (_item->name##_next) \ + _item = _item->name##_next; \ + (tail) = _item; \ + } \ + } while (false) + +/* Insert an item after another one (a = where, b = what) */ +#define LIST_INSERT_AFTER(name,head,a,b) \ + do { \ + typeof(*(head)) **_head = &(head), *_a = (a), *_b = (b); \ + assert(_b); \ + if (!_a) { \ + if ((_b->name##_next = *_head)) \ + _b->name##_next->name##_prev = _b; \ + _b->name##_prev = NULL; \ + *_head = _b; \ + } else { \ + if ((_b->name##_next = _a->name##_next)) \ + _b->name##_next->name##_prev = _b; \ + _b->name##_prev = _a; \ + _a->name##_next = _b; \ + } \ + } while (false) + +/* Insert an item before another one (a = where, b = what) */ +#define LIST_INSERT_BEFORE(name,head,a,b) \ + do { \ + typeof(*(head)) **_head = &(head), *_a = (a), *_b = (b); \ + assert(_b); \ + if (!_a) { \ + if (!*_head) { \ + _b->name##_next = NULL; \ + _b->name##_prev = NULL; \ + *_head = _b; \ + } else { \ + typeof(*(head)) *_tail = (head); \ + while (_tail->name##_next) \ + _tail = _tail->name##_next; \ + _b->name##_next = NULL; \ + _b->name##_prev = _tail; \ + _tail->name##_next = _b; \ + } \ + } else { \ + if ((_b->name##_prev = _a->name##_prev)) \ + _b->name##_prev->name##_next = _b; \ + else \ + *_head = _b; \ + _b->name##_next = _a; \ + _a->name##_prev = _b; \ + } \ + } while (false) + +#define LIST_JUST_US(name,item) \ + (!(item)->name##_prev && !(item)->name##_next) \ + +#define LIST_FOREACH(name,i,head) \ + for ((i) = (head); (i); (i) = (i)->name##_next) + +#define LIST_FOREACH_SAFE(name,i,n,head) \ + for ((i) = (head); (i) && (((n) = (i)->name##_next), 1); (i) = (n)) + +#define LIST_FOREACH_BEFORE(name,i,p) \ + for ((i) = (p)->name##_prev; (i); (i) = (i)->name##_prev) + +#define LIST_FOREACH_AFTER(name,i,p) \ + for ((i) = (p)->name##_next; (i); (i) = (i)->name##_next) + +/* Iterate through all the members of the list p is included in, but skip over p */ +#define LIST_FOREACH_OTHERS(name,i,p) \ + for (({ \ + (i) = (p); \ + while ((i) && (i)->name##_prev) \ + (i) = (i)->name##_prev; \ + if ((i) == (p)) \ + (i) = (p)->name##_next; \ + }); \ + (i); \ + (i) = (i)->name##_next == (p) ? (p)->name##_next : (i)->name##_next) + +/* Loop starting from p->next until p->prev. + p can be adjusted meanwhile. */ +#define LIST_LOOP_BUT_ONE(name,i,head,p) \ + for ((i) = (p)->name##_next ? (p)->name##_next : (head); \ + (i) != (p); \ + (i) = (i)->name##_next ? (i)->name##_next : (head)) + +#define LIST_IS_EMPTY(head) \ + (!(head)) diff --git a/src/log.h b/src/log.h index 2cc8b9be..f1c5da4f 100644 --- a/src/log.h +++ b/src/log.h @@ -39,3 +39,9 @@ static inline int log_oom(void) { void set_log_level(int level); int set_log_level_from_string(const char *str); + +#ifdef LOG_TRACE +# define log_trace(...) log_debug(__VA_ARGS__) +#else +# define log_trace(...) do {} while (0) +#endif diff --git a/src/util.h b/src/util.h index dc9d03e4..68299a5b 100644 --- a/src/util.h +++ b/src/util.h @@ -765,6 +765,15 @@ DEFINE_TRIVIAL_CLEANUP_FUNC(char*, unlink_and_free); int free_and_strdup(char **p, const char *s); +/* Takes inspiration from Rusts's Option::take() method: reads and returns a pointer, but at the same time resets it to + * NULL. See: https://doc.rust-lang.org/std/option/enum.Option.html#method.take */ +#define TAKE_PTR(ptr) \ + ({ \ + typeof(ptr) _ptr_ = (ptr); \ + (ptr) = NULL; \ + _ptr_; \ + }) + /* A check against a list of errors commonly used to indicate that a syscall/ioctl/other kernel operation we request is * not supported locally. We maintain a generic list for this here, instead of adjusting the possible error codes to * exactly what the calls might return for the simple reasons that due to FUSE and many differing in-kernel