diff --git a/CHANGELOG.md b/CHANGELOG.md index 07537a284ce5..015bf145b3e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [5.0.0-dev3] + +[5.0.0-dev3]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev3 + +- Added a `consensus.max_uncommitted_tx_count` configuration option, which specifies the maximum number of transactions that can be pending on the primary. When that threshold is exceeded, a `503 Service Unavailable` is temporarily returned on all but the `/node/*` paths (#5692). + ## [5.0.0-dev2] [5.0.0-dev2]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev2 diff --git a/doc/host_config_schema/cchost_config.json b/doc/host_config_schema/cchost_config.json index 04fcc1babb27..481115d8520f 100644 --- a/doc/host_config_schema/cchost_config.json +++ b/doc/host_config_schema/cchost_config.json @@ -562,6 +562,11 @@ "type": "string", "default": "5000ms", "description": "Maximum timeout (time string) after which backup nodes that have not received any message from the primary node (or voted for a candidate) will trigger a new election. This timeout is also used by candidates to restart unsuccessful elections. This should be set to a significantly greater value than 'message_timeout' plus the expected network delay" + }, + "max_uncommitted_tx_count": { + "type": "integer", + "default": 10000, + "description": "Maximum number of uncommitted transactions allowed before the primary refuses new transactions. Unlimited if set to 0." } }, "description": "This section includes configuration for the consensus protocol (note: should be the same for all other nodes in the service)", diff --git a/include/ccf/endpoint_registry.h b/include/ccf/endpoint_registry.h index 2052dd72df26..8be93dbb3746 100644 --- a/include/ccf/endpoint_registry.h +++ b/include/ccf/endpoint_registry.h @@ -285,5 +285,10 @@ namespace ccf::endpoints const EndpointDefinitionPtr& endpoint); virtual void increment_metrics_retries( const EndpointDefinitionPtr& endpoint); + + virtual bool apply_uncommitted_tx_backpressure() const + { + return true; + } }; } diff --git a/include/ccf/odata_error.h b/include/ccf/odata_error.h index 1dea7aceaa18..3eb7a90661a5 100644 --- a/include/ccf/odata_error.h +++ b/include/ccf/odata_error.h @@ -112,6 +112,7 @@ namespace ccf ERROR(ProposalCreatedTooLongAgo) ERROR(InvalidCreatedAt) ERROR(JSException) + ERROR(TooManyPendingTransactions) ERROR(MissingApiVersionParameter) ERROR(UnsupportedApiVersionValue) diff --git a/include/ccf/service/consensus_config.h b/include/ccf/service/consensus_config.h index 55c5fcf018e4..ba1443949d96 100644 --- a/include/ccf/service/consensus_config.h +++ b/include/ccf/service/consensus_config.h @@ -11,6 +11,7 @@ namespace consensus { ds::TimeString message_timeout = {"100ms"}; ds::TimeString election_timeout = {"5000ms"}; + size_t max_uncommitted_tx_count = 10000; bool operator==(const Configuration&) const = default; bool operator!=(const Configuration&) const = default; diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index c157d3a790f4..40e86f58273e 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -146,6 +146,7 @@ namespace aft // Timeouts std::chrono::milliseconds request_timeout; std::chrono::milliseconds election_timeout; + size_t max_uncommitted_tx_count; bool ticking = false; // Configurations @@ -212,6 +213,7 @@ namespace aft request_timeout(settings_.message_timeout), election_timeout(settings_.election_timeout), + max_uncommitted_tx_count(settings_.max_uncommitted_tx_count), reconfiguration_type(reconfiguration_type_), node_client(rpc_request_context_), @@ -255,6 +257,22 @@ namespace aft return can_replicate_unsafe(); } + /** + * Returns true if the node is primary, max_uncommitted_tx_count is non-zero + * and the number of transactions replicated but not yet committed exceeds + * max_uncommitted_tx_count. + */ + bool is_at_max_capacity() override + { + if (max_uncommitted_tx_count == 0) + { + return false; + } + std::unique_lock guard(state->lock); + return state->leadership_state == kv::LeadershipState::Leader && + (state->last_idx - state->commit_idx >= max_uncommitted_tx_count); + } + Consensus::SignatureDisposition get_signature_disposition() override { std::unique_lock guard(state->lock); diff --git a/src/consensus/aft/test/test_common.h b/src/consensus/aft/test/test_common.h index 07dccf0a134a..2e90359a1341 100644 --- a/src/consensus/aft/test/test_common.h +++ b/src/consensus/aft/test/test_common.h @@ -19,12 +19,13 @@ static std::vector cert; static const ds::TimeString request_timeout_ = {"10ms"}; static const ds::TimeString election_timeout_ = {"100ms"}; +static const size_t max_uncommitted_tx_count_ = 0; static const std::chrono::milliseconds request_timeout = request_timeout_; static const std::chrono::milliseconds election_timeout = election_timeout_; static const consensus::Configuration raft_settings{ - request_timeout_, election_timeout_}; + request_timeout_, election_timeout_, max_uncommitted_tx_count_}; static auto hooks = std::make_shared(); diff --git a/src/consensus/consensus_types.h b/src/consensus/consensus_types.h index 9ede9dc6b03b..e65ce57e5056 100644 --- a/src/consensus/consensus_types.h +++ b/src/consensus/consensus_types.h @@ -15,7 +15,7 @@ namespace consensus DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(Configuration); DECLARE_JSON_REQUIRED_FIELDS(Configuration); DECLARE_JSON_OPTIONAL_FIELDS( - Configuration, message_timeout, election_timeout); + Configuration, message_timeout, election_timeout, max_uncommitted_tx_count); #pragma pack(push, 1) template diff --git a/src/kv/kv_types.h b/src/kv/kv_types.h index 92887ba42be8..3a664e962302 100644 --- a/src/kv/kv_types.h +++ b/src/kv/kv_types.h @@ -440,6 +440,7 @@ namespace kv virtual bool is_backup() = 0; virtual bool is_candidate() = 0; virtual bool can_replicate() = 0; + virtual bool is_at_max_capacity() = 0; enum class SignatureDisposition { diff --git a/src/kv/test/stub_consensus.h b/src/kv/test/stub_consensus.h index 978634741924..01f56b89a306 100644 --- a/src/kv/test/stub_consensus.h +++ b/src/kv/test/stub_consensus.h @@ -59,6 +59,11 @@ namespace kv::test return state == Primary; } + virtual bool is_at_max_capacity() override + { + return false; + } + virtual Consensus::SignatureDisposition get_signature_disposition() override { if (state == Primary) diff --git a/src/node/rpc/frontend.h b/src/node/rpc/frontend.h index f489a8f90c99..b9863acf77cf 100644 --- a/src/node/rpc/frontend.h +++ b/src/node/rpc/frontend.h @@ -397,6 +397,20 @@ namespace ccf while (attempts < max_attempts) { + if (consensus != nullptr) + { + if ( + endpoints.apply_uncommitted_tx_backpressure() && + consensus->is_at_max_capacity()) + { + ctx->set_error( + HTTP_STATUS_SERVICE_UNAVAILABLE, + ccf::errors::TooManyPendingTransactions, + "Too many transactions pending commit on the service."); + return; + } + } + std::unique_ptr tx_p = tables.create_tx_ptr(); set_root_on_proposals(*ctx, *tx_p); diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index f6e216a0aaf9..4bd2de5577c4 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -144,6 +144,14 @@ namespace ccf class NodeEndpoints : public CommonEndpointRegistry { + public: + // The node frontend is exempt from backpressure rules to enable an operator + // to access a node that is not making progress. + bool apply_uncommitted_tx_backpressure() const override + { + return false; + } + private: NetworkState& network; ccf::AbstractNodeOperation& node_operation; diff --git a/tests/config.jinja b/tests/config.jinja index 140f30e3e091..872b96467c25 100644 --- a/tests/config.jinja +++ b/tests/config.jinja @@ -69,7 +69,8 @@ "consensus": { "message_timeout": "{{ message_timeout }}", - "election_timeout": "{{ election_timeout }}" + "election_timeout": "{{ election_timeout }}", + "max_uncommitted_tx_count": {{ max_uncommitted_tx_count|tojson or 0 }} }, "ledger_signatures": { diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index b3a5fb5e3b46..649a8eb7c554 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -523,7 +523,49 @@ def run_pid_file_check(args): network.ignoring_shutdown_errors = True +def run_max_uncommitted_tx_count(args): + with infra.network.network( + ["local://localhost", "local://localhost"], + args.binary_dir, + args.debug_nodes, + args.perf_nodes, + pdb=args.pdb, + ) as network: + uncommitted_cap = 20 + network.per_node_args_override[0] = { + "max_uncommitted_tx_count": uncommitted_cap + } + network.start_and_open(args) + LOG.info( + f"Start network with max_uncommitted_tx_count set to {uncommitted_cap}" + ) + # Stop the backup node, to freeze commit + primary, backups = network.find_nodes() + backups[0].stop() + unavailable_count = 0 + last_accepted_index = 0 + + with primary.client(identity="user0") as c: + for idx in range(uncommitted_cap + 1): + r = c.post( + "/app/log/public?scope=test_large_snapshot", + body={"id": idx, "msg": "X" * 42}, + log_capture=[], + ) + if r.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE: + unavailable_count += 1 + if last_accepted_index == 0: + last_accepted_index = idx - 1 + LOG.info(f"Last accepted: {last_accepted_index}, {unavailable_count} 503s") + assert unavailable_count > 0, "Expected at least one SERVICE_UNAVAILABLE" + + with primary.client() as c: + r = c.get("/node/network") + assert r.status_code == http.HTTPStatus.OK.value, r + + def run(args): + run_max_uncommitted_tx_count(args) run_file_operations(args) run_tls_san_checks(args) run_config_timeout_check(args) diff --git a/tests/infra/basicperf.py b/tests/infra/basicperf.py index 77c8b1881e28..963390aaff9d 100644 --- a/tests/infra/basicperf.py +++ b/tests/infra/basicperf.py @@ -19,6 +19,7 @@ import shutil import datetime import ccf.ledger +import plotext as plt def configure_remote_client(args, client_id, client_host, common_dir): @@ -421,12 +422,14 @@ def table(): requestSize=pl.col("request").map_elements(len), responseSize=pl.col("rawResponse").map_elements(len), ) - # 50x are expected when we stop the primary, 500 when we drop the session - # to maintain consistency, and 504 when we try to write to the future primary - # before their election. Since these requests effectively do nothing, they - # should not count towards latency statistics. - # if args.stop_primary_after_s: - # overall = overall.filter(pl.col("responseStatus") < 500) + + number_of_errors = overall.filter( + pl.col("responseStatus") >= 500 + ).height + total_number_of_requests = overall.height + print( + f"Errors: {number_of_errors} ({number_of_errors / total_number_of_requests * 100:.2f}%)" + ) overall = overall.with_columns( pl.col("receiveTime").alias("latency") - pl.col("sendTime") @@ -449,6 +452,13 @@ def table(): agg = pl.concat(agg, rechunk=True) LOG.info("Aggregate results") print(agg) + + number_of_errors = agg.filter(pl.col("responseStatus") >= 500).height + total_number_of_requests = agg.height + print( + f"Errors: {number_of_errors} ({number_of_errors / total_number_of_requests * 100:.2f}%)" + ) + agg_path = os.path.join( network.common_dir, "aggregated_basicperf_output.parquet" ) @@ -535,17 +545,48 @@ def table(): .count() .rename({"count": "rcvd"}) ) + errors_per_sec = ( + agg.with_columns( + ( + (pl.col("receiveTime").alias("second") - start_send) + / 1000000 + ).cast(pl.Int64) + ) + .filter(pl.col("responseStatus") >= 500) + .group_by("second") + .count() + .rename({"count": "errors"}) + ) - per_sec = sent_per_sec.join(recv_per_sec, on="second").sort("second") - print(per_sec) - per_sec = per_sec.with_columns( - sent_rate=pl.col("sent") / per_sec["sent"].max(), - rcvd_rate=pl.col("rcvd") / per_sec["rcvd"].max(), + per_sec = ( + sent_per_sec.join(recv_per_sec, on="second") + .join(errors_per_sec, on="second", how="outer") + .sort("second") + .fill_null(0) ) - for row in per_sec.iter_rows(named=True): - s = "S" * int(row["sent_rate"] * 20) - r = "R" * int(row["rcvd_rate"] * 20) - print(f"{row['second']:>3}: {s:>20}|{r:<20}") + + plt.simple_bar( + list(per_sec["second"]), + list(per_sec["sent"]), + width=100, + title="Sent requests per second", + ) + plt.show() + + plt.simple_stacked_bar( + list(per_sec["second"]), + [list(per_sec["rcvd"]), list(per_sec["errors"])], + width=100, + labels=["rcvd", "errors"], + colors=["green", "red"], + title="Received requests per second", + ) + plt.show() + + if number_of_errors and not args.stop_primary_after_s: + raise RuntimeError( + f"Errors: {number_of_errors} ({number_of_errors / total_number_of_requests * 100:.2f}%)" + ) with cimetrics.upload.metrics(complete=False) as metrics: LOG.success("Uploading results") diff --git a/tests/infra/remote.py b/tests/infra/remote.py index 503bc2781bf4..0acec80f8982 100644 --- a/tests/infra/remote.py +++ b/tests/infra/remote.py @@ -618,6 +618,7 @@ def __init__( ignore_first_sigterm=False, node_container_image=None, follow_redirect=True, + max_uncommitted_tx_count=0, **kwargs, ): """ @@ -793,6 +794,7 @@ def __init__( ignore_first_sigterm=ignore_first_sigterm, node_address=remote_class.get_node_address(node_address), follow_redirect=follow_redirect, + max_uncommitted_tx_count=max_uncommitted_tx_count, **kwargs, ) diff --git a/tests/requirements.txt b/tests/requirements.txt index f98b45a225bf..c9360e3e078e 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -23,3 +23,4 @@ pycose fastparquet==2023.* prettytable==3.* polars +plotext \ No newline at end of file