Skip to content

Commit

Permalink
Add basic backpressure mechanism if primary has too many in-flight tr…
Browse files Browse the repository at this point in the history
…ansactions (microsoft#5692)

(cherry picked from commit d1d9d8e)

# Conflicts:
#	CHANGELOG.md
#	tests/infra/basicperf.py
#	tests/infra/remote.py
  • Loading branch information
achamayou committed Oct 18, 2023
1 parent ca408c9 commit 94c37cd
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 5 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [4.0.11]

[4.0.11]: https://github.com/microsoft/CCF/releases/tag/ccf-4.0.11

- Path to the enclave file should now be passed as `--enclave-file` CLI argument to `cchost`, rather than `enclave.file` entry within configuration file. A potential SNP security context directory environment variable override, where desired, should now be passed as `--snp-security-context-dir-var` CLI argument to `cchost`, rather than `attestation.environment.security_context_directory` entry within configuration file. This is to ensure that these values are attested on Confidential Containers/SNP, even if the configuration itself is provided from un-attested storage, such as an external mount. The configuration entries are deprecated, and will be removed in a future release.
- A new versioned governance API is now available, with the `api-version=2023-06-01-preview` query parameter. This will fully replace the previous governance endpoints, which will be removed in a future release. A guide to aid in upgrading from the previous API is available [here](https://microsoft.github.io/CCF/main/governance/gov_api_schemas/upgrading_from_classic.html)
- Added a `consensus.max_uncommitted_tx_count` configuration option, which specifies the maximum number of transactions that can be pending on the primary. When that threshold is exceeded, a `503 Service Unavailable` is temporarily returned on all but the `/node/*` paths (#5692).

## [4.0.10]

Expand Down
5 changes: 5 additions & 0 deletions doc/host_config_schema/cchost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,11 @@
"type": "string",
"default": "5000ms",
"description": "Maximum timeout (time string) after which backup nodes that have not received any message from the primary node (or voted for a candidate) will trigger a new election. This timeout is also used by candidates to restart unsuccessful elections. This should be set to a significantly greater value than 'message_timeout' plus the expected network delay"
},
"max_uncommitted_tx_count": {
"type": "integer",
"default": 10000,
"description": "Maximum number of uncommitted transactions allowed before the primary refuses new transactions. Unlimited if set to 0."
}
},
"description": "This section includes configuration for the consensus protocol (note: should be the same for all other nodes in the service)",
Expand Down
5 changes: 5 additions & 0 deletions include/ccf/endpoint_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,5 +285,10 @@ namespace ccf::endpoints
const EndpointDefinitionPtr& endpoint);
virtual void increment_metrics_retries(
const EndpointDefinitionPtr& endpoint);

virtual bool apply_uncommitted_tx_backpressure() const
{
return true;
}
};
}
1 change: 1 addition & 0 deletions include/ccf/odata_error.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ namespace ccf
ERROR(ProposalCreatedTooLongAgo)
ERROR(InvalidCreatedAt)
ERROR(JSException)
ERROR(TooManyPendingTransactions)
ERROR(MissingApiVersionParameter)
ERROR(UnsupportedApiVersionValue)

Expand Down
1 change: 1 addition & 0 deletions include/ccf/service/consensus_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace consensus
{
ds::TimeString message_timeout = {"100ms"};
ds::TimeString election_timeout = {"5000ms"};
size_t max_uncommitted_tx_count = 10000;

bool operator==(const Configuration&) const = default;
bool operator!=(const Configuration&) const = default;
Expand Down
18 changes: 18 additions & 0 deletions src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ namespace aft
// Timeouts
std::chrono::milliseconds request_timeout;
std::chrono::milliseconds election_timeout;
size_t max_uncommitted_tx_count;
bool ticking = false;

// Configurations
Expand Down Expand Up @@ -212,6 +213,7 @@ namespace aft

request_timeout(settings_.message_timeout),
election_timeout(settings_.election_timeout),
max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),

reconfiguration_type(reconfiguration_type_),
node_client(rpc_request_context_),
Expand Down Expand Up @@ -255,6 +257,22 @@ namespace aft
return can_replicate_unsafe();
}

/**
* Returns true if the node is primary, max_uncommitted_tx_count is non-zero
* and the number of transactions replicated but not yet committed exceeds
* max_uncommitted_tx_count.
*/
bool is_at_max_capacity() override
{
if (max_uncommitted_tx_count == 0)
{
return false;
}
std::unique_lock<ccf::pal::Mutex> guard(state->lock);
return state->leadership_state == kv::LeadershipState::Leader &&
(state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
}

Consensus::SignatureDisposition get_signature_disposition() override
{
std::unique_lock<ccf::pal::Mutex> guard(state->lock);
Expand Down
3 changes: 2 additions & 1 deletion src/consensus/aft/test/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ static std::vector<uint8_t> cert;

static const ds::TimeString request_timeout_ = {"10ms"};
static const ds::TimeString election_timeout_ = {"100ms"};
static const size_t max_uncommitted_tx_count_ = 0;

static const std::chrono::milliseconds request_timeout = request_timeout_;
static const std::chrono::milliseconds election_timeout = election_timeout_;

static const consensus::Configuration raft_settings{
request_timeout_, election_timeout_};
request_timeout_, election_timeout_, max_uncommitted_tx_count_};

static auto hooks = std::make_shared<kv::ConsensusHookPtrs>();

Expand Down
2 changes: 1 addition & 1 deletion src/consensus/consensus_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace consensus
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(Configuration);
DECLARE_JSON_REQUIRED_FIELDS(Configuration);
DECLARE_JSON_OPTIONAL_FIELDS(
Configuration, message_timeout, election_timeout);
Configuration, message_timeout, election_timeout, max_uncommitted_tx_count);

#pragma pack(push, 1)
template <typename T>
Expand Down
1 change: 1 addition & 0 deletions src/kv/kv_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ namespace kv
virtual bool is_backup() = 0;
virtual bool is_candidate() = 0;
virtual bool can_replicate() = 0;
virtual bool is_at_max_capacity() = 0;

enum class SignatureDisposition
{
Expand Down
5 changes: 5 additions & 0 deletions src/kv/test/stub_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ namespace kv::test
return state == Primary;
}

virtual bool is_at_max_capacity() override
{
return false;
}

virtual Consensus::SignatureDisposition get_signature_disposition() override
{
if (state == Primary)
Expand Down
14 changes: 14 additions & 0 deletions src/node/rpc/frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,20 @@ namespace ccf

while (attempts < max_attempts)
{
if (consensus != nullptr)
{
if (
endpoints.apply_uncommitted_tx_backpressure() &&
consensus->is_at_max_capacity())
{
ctx->set_error(
HTTP_STATUS_SERVICE_UNAVAILABLE,
ccf::errors::TooManyPendingTransactions,
"Too many transactions pending commit on the service.");
return;
}
}

std::unique_ptr<kv::CommittableTx> tx_p = tables.create_tx_ptr();
set_root_on_proposals(*ctx, *tx_p);

Expand Down
8 changes: 8 additions & 0 deletions src/node/rpc/node_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ namespace ccf

class NodeEndpoints : public CommonEndpointRegistry
{
public:
// The node frontend is exempt from backpressure rules to enable an operator
// to access a node that is not making progress.
bool apply_uncommitted_tx_backpressure() const override
{
return false;
}

private:
NetworkState& network;
ccf::AbstractNodeOperation& node_operation;
Expand Down
3 changes: 2 additions & 1 deletion tests/config.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
"consensus":
{
"message_timeout": "{{ message_timeout }}",
"election_timeout": "{{ election_timeout }}"
"election_timeout": "{{ election_timeout }}",
"max_uncommitted_tx_count": {{ max_uncommitted_tx_count|tojson or 0 }}
},
"ledger_signatures":
{
Expand Down
42 changes: 42 additions & 0 deletions tests/e2e_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,49 @@ def run_pid_file_check(args):
network.ignoring_shutdown_errors = True


def run_max_uncommitted_tx_count(args):
with infra.network.network(
["local://localhost", "local://localhost"],
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
pdb=args.pdb,
) as network:
uncommitted_cap = 20
network.per_node_args_override[0] = {
"max_uncommitted_tx_count": uncommitted_cap
}
network.start_and_open(args)
LOG.info(
f"Start network with max_uncommitted_tx_count set to {uncommitted_cap}"
)
# Stop the backup node, to freeze commit
primary, backups = network.find_nodes()
backups[0].stop()
unavailable_count = 0
last_accepted_index = 0

with primary.client(identity="user0") as c:
for idx in range(uncommitted_cap + 1):
r = c.post(
"/app/log/public?scope=test_large_snapshot",
body={"id": idx, "msg": "X" * 42},
log_capture=[],
)
if r.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE:
unavailable_count += 1
if last_accepted_index == 0:
last_accepted_index = idx - 1
LOG.info(f"Last accepted: {last_accepted_index}, {unavailable_count} 503s")
assert unavailable_count > 0, "Expected at least one SERVICE_UNAVAILABLE"

with primary.client() as c:
r = c.get("/node/network")
assert r.status_code == http.HTTPStatus.OK.value, r


def run(args):
run_max_uncommitted_tx_count(args)
run_file_operations(args)
run_tls_san_checks(args)
run_configuration_file_checks(args)
Expand Down
2 changes: 2 additions & 0 deletions tests/infra/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ def __init__(
set_snp_uvm_security_context_dir_envvar=True,
ignore_first_sigterm=False,
node_container_image=None,
max_uncommitted_tx_count=0,
**kwargs,
):
"""
Expand Down Expand Up @@ -788,6 +789,7 @@ def __init__(
snp_security_context_directory_envvar=snp_security_context_directory_envvar, # Ignored by current jinja, but passed for LTS compat
ignore_first_sigterm=ignore_first_sigterm,
node_address=remote_class.get_node_address(node_address),
max_uncommitted_tx_count=max_uncommitted_tx_count,
**kwargs,
)

Expand Down
1 change: 1 addition & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ pycose
fastparquet==2023.*
prettytable==3.*
polars
plotext

0 comments on commit 94c37cd

Please sign in to comment.