Skip to content

Commit

Permalink
Add basic backpressure mechanism if primary has too many in-flight tr…
Browse files Browse the repository at this point in the history
…ansactions (#5692)
  • Loading branch information
achamayou authored Sep 28, 2023
1 parent b46d01c commit d1d9d8e
Show file tree
Hide file tree
Showing 17 changed files with 170 additions and 18 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [5.0.0-dev3]

[5.0.0-dev3]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev3

- Added a `consensus.max_uncommitted_tx_count` configuration option, which specifies the maximum number of transactions that can be pending on the primary. When that threshold is exceeded, a `503 Service Unavailable` is temporarily returned on all but the `/node/*` paths (#5692).

## [5.0.0-dev2]

[5.0.0-dev2]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev2
Expand Down
5 changes: 5 additions & 0 deletions doc/host_config_schema/cchost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,11 @@
"type": "string",
"default": "5000ms",
"description": "Maximum timeout (time string) after which backup nodes that have not received any message from the primary node (or voted for a candidate) will trigger a new election. This timeout is also used by candidates to restart unsuccessful elections. This should be set to a significantly greater value than 'message_timeout' plus the expected network delay"
},
"max_uncommitted_tx_count": {
"type": "integer",
"default": 10000,
"description": "Maximum number of uncommitted transactions allowed before the primary refuses new transactions. Unlimited if set to 0."
}
},
"description": "This section includes configuration for the consensus protocol (note: should be the same for all other nodes in the service)",
Expand Down
5 changes: 5 additions & 0 deletions include/ccf/endpoint_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,5 +285,10 @@ namespace ccf::endpoints
const EndpointDefinitionPtr& endpoint);
virtual void increment_metrics_retries(
const EndpointDefinitionPtr& endpoint);

virtual bool apply_uncommitted_tx_backpressure() const
{
return true;
}
};
}
1 change: 1 addition & 0 deletions include/ccf/odata_error.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ namespace ccf
ERROR(ProposalCreatedTooLongAgo)
ERROR(InvalidCreatedAt)
ERROR(JSException)
ERROR(TooManyPendingTransactions)
ERROR(MissingApiVersionParameter)
ERROR(UnsupportedApiVersionValue)

Expand Down
1 change: 1 addition & 0 deletions include/ccf/service/consensus_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace consensus
{
ds::TimeString message_timeout = {"100ms"};
ds::TimeString election_timeout = {"5000ms"};
size_t max_uncommitted_tx_count = 10000;

bool operator==(const Configuration&) const = default;
bool operator!=(const Configuration&) const = default;
Expand Down
18 changes: 18 additions & 0 deletions src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ namespace aft
// Timeouts
std::chrono::milliseconds request_timeout;
std::chrono::milliseconds election_timeout;
size_t max_uncommitted_tx_count;
bool ticking = false;

// Configurations
Expand Down Expand Up @@ -212,6 +213,7 @@ namespace aft

request_timeout(settings_.message_timeout),
election_timeout(settings_.election_timeout),
max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),

reconfiguration_type(reconfiguration_type_),
node_client(rpc_request_context_),
Expand Down Expand Up @@ -255,6 +257,22 @@ namespace aft
return can_replicate_unsafe();
}

/**
* Returns true if the node is primary, max_uncommitted_tx_count is non-zero
* and the number of transactions replicated but not yet committed exceeds
* max_uncommitted_tx_count.
*/
bool is_at_max_capacity() override
{
if (max_uncommitted_tx_count == 0)
{
return false;
}
std::unique_lock<ccf::pal::Mutex> guard(state->lock);
return state->leadership_state == kv::LeadershipState::Leader &&
(state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
}

Consensus::SignatureDisposition get_signature_disposition() override
{
std::unique_lock<ccf::pal::Mutex> guard(state->lock);
Expand Down
3 changes: 2 additions & 1 deletion src/consensus/aft/test/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ static std::vector<uint8_t> cert;

static const ds::TimeString request_timeout_ = {"10ms"};
static const ds::TimeString election_timeout_ = {"100ms"};
static const size_t max_uncommitted_tx_count_ = 0;

static const std::chrono::milliseconds request_timeout = request_timeout_;
static const std::chrono::milliseconds election_timeout = election_timeout_;

static const consensus::Configuration raft_settings{
request_timeout_, election_timeout_};
request_timeout_, election_timeout_, max_uncommitted_tx_count_};

static auto hooks = std::make_shared<kv::ConsensusHookPtrs>();

Expand Down
2 changes: 1 addition & 1 deletion src/consensus/consensus_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace consensus
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(Configuration);
DECLARE_JSON_REQUIRED_FIELDS(Configuration);
DECLARE_JSON_OPTIONAL_FIELDS(
Configuration, message_timeout, election_timeout);
Configuration, message_timeout, election_timeout, max_uncommitted_tx_count);

#pragma pack(push, 1)
template <typename T>
Expand Down
1 change: 1 addition & 0 deletions src/kv/kv_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ namespace kv
virtual bool is_backup() = 0;
virtual bool is_candidate() = 0;
virtual bool can_replicate() = 0;
virtual bool is_at_max_capacity() = 0;

enum class SignatureDisposition
{
Expand Down
5 changes: 5 additions & 0 deletions src/kv/test/stub_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ namespace kv::test
return state == Primary;
}

virtual bool is_at_max_capacity() override
{
return false;
}

virtual Consensus::SignatureDisposition get_signature_disposition() override
{
if (state == Primary)
Expand Down
14 changes: 14 additions & 0 deletions src/node/rpc/frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,20 @@ namespace ccf

while (attempts < max_attempts)
{
if (consensus != nullptr)
{
if (
endpoints.apply_uncommitted_tx_backpressure() &&
consensus->is_at_max_capacity())
{
ctx->set_error(
HTTP_STATUS_SERVICE_UNAVAILABLE,
ccf::errors::TooManyPendingTransactions,
"Too many transactions pending commit on the service.");
return;
}
}

std::unique_ptr<kv::CommittableTx> tx_p = tables.create_tx_ptr();
set_root_on_proposals(*ctx, *tx_p);

Expand Down
8 changes: 8 additions & 0 deletions src/node/rpc/node_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ namespace ccf

class NodeEndpoints : public CommonEndpointRegistry
{
public:
// The node frontend is exempt from backpressure rules to enable an operator
// to access a node that is not making progress.
bool apply_uncommitted_tx_backpressure() const override
{
return false;
}

private:
NetworkState& network;
ccf::AbstractNodeOperation& node_operation;
Expand Down
3 changes: 2 additions & 1 deletion tests/config.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@
"consensus":
{
"message_timeout": "{{ message_timeout }}",
"election_timeout": "{{ election_timeout }}"
"election_timeout": "{{ election_timeout }}",
"max_uncommitted_tx_count": {{ max_uncommitted_tx_count|tojson or 0 }}
},
"ledger_signatures":
{
Expand Down
42 changes: 42 additions & 0 deletions tests/e2e_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,49 @@ def run_pid_file_check(args):
network.ignoring_shutdown_errors = True


def run_max_uncommitted_tx_count(args):
with infra.network.network(
["local://localhost", "local://localhost"],
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
pdb=args.pdb,
) as network:
uncommitted_cap = 20
network.per_node_args_override[0] = {
"max_uncommitted_tx_count": uncommitted_cap
}
network.start_and_open(args)
LOG.info(
f"Start network with max_uncommitted_tx_count set to {uncommitted_cap}"
)
# Stop the backup node, to freeze commit
primary, backups = network.find_nodes()
backups[0].stop()
unavailable_count = 0
last_accepted_index = 0

with primary.client(identity="user0") as c:
for idx in range(uncommitted_cap + 1):
r = c.post(
"/app/log/public?scope=test_large_snapshot",
body={"id": idx, "msg": "X" * 42},
log_capture=[],
)
if r.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE:
unavailable_count += 1
if last_accepted_index == 0:
last_accepted_index = idx - 1
LOG.info(f"Last accepted: {last_accepted_index}, {unavailable_count} 503s")
assert unavailable_count > 0, "Expected at least one SERVICE_UNAVAILABLE"

with primary.client() as c:
r = c.get("/node/network")
assert r.status_code == http.HTTPStatus.OK.value, r


def run(args):
run_max_uncommitted_tx_count(args)
run_file_operations(args)
run_tls_san_checks(args)
run_config_timeout_check(args)
Expand Down
71 changes: 56 additions & 15 deletions tests/infra/basicperf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shutil
import datetime
import ccf.ledger
import plotext as plt


def configure_remote_client(args, client_id, client_host, common_dir):
Expand Down Expand Up @@ -421,12 +422,14 @@ def table():
requestSize=pl.col("request").map_elements(len),
responseSize=pl.col("rawResponse").map_elements(len),
)
# 50x are expected when we stop the primary, 500 when we drop the session
# to maintain consistency, and 504 when we try to write to the future primary
# before their election. Since these requests effectively do nothing, they
# should not count towards latency statistics.
# if args.stop_primary_after_s:
# overall = overall.filter(pl.col("responseStatus") < 500)

number_of_errors = overall.filter(
pl.col("responseStatus") >= 500
).height
total_number_of_requests = overall.height
print(
f"Errors: {number_of_errors} ({number_of_errors / total_number_of_requests * 100:.2f}%)"
)

overall = overall.with_columns(
pl.col("receiveTime").alias("latency") - pl.col("sendTime")
Expand All @@ -449,6 +452,13 @@ def table():
agg = pl.concat(agg, rechunk=True)
LOG.info("Aggregate results")
print(agg)

number_of_errors = agg.filter(pl.col("responseStatus") >= 500).height
total_number_of_requests = agg.height
print(
f"Errors: {number_of_errors} ({number_of_errors / total_number_of_requests * 100:.2f}%)"
)

agg_path = os.path.join(
network.common_dir, "aggregated_basicperf_output.parquet"
)
Expand Down Expand Up @@ -535,17 +545,48 @@ def table():
.count()
.rename({"count": "rcvd"})
)
errors_per_sec = (
agg.with_columns(
(
(pl.col("receiveTime").alias("second") - start_send)
/ 1000000
).cast(pl.Int64)
)
.filter(pl.col("responseStatus") >= 500)
.group_by("second")
.count()
.rename({"count": "errors"})
)

per_sec = sent_per_sec.join(recv_per_sec, on="second").sort("second")
print(per_sec)
per_sec = per_sec.with_columns(
sent_rate=pl.col("sent") / per_sec["sent"].max(),
rcvd_rate=pl.col("rcvd") / per_sec["rcvd"].max(),
per_sec = (
sent_per_sec.join(recv_per_sec, on="second")
.join(errors_per_sec, on="second", how="outer")
.sort("second")
.fill_null(0)
)
for row in per_sec.iter_rows(named=True):
s = "S" * int(row["sent_rate"] * 20)
r = "R" * int(row["rcvd_rate"] * 20)
print(f"{row['second']:>3}: {s:>20}|{r:<20}")

plt.simple_bar(
list(per_sec["second"]),
list(per_sec["sent"]),
width=100,
title="Sent requests per second",
)
plt.show()

plt.simple_stacked_bar(
list(per_sec["second"]),
[list(per_sec["rcvd"]), list(per_sec["errors"])],
width=100,
labels=["rcvd", "errors"],
colors=["green", "red"],
title="Received requests per second",
)
plt.show()

if number_of_errors and not args.stop_primary_after_s:
raise RuntimeError(
f"Errors: {number_of_errors} ({number_of_errors / total_number_of_requests * 100:.2f}%)"
)

with cimetrics.upload.metrics(complete=False) as metrics:
LOG.success("Uploading results")
Expand Down
2 changes: 2 additions & 0 deletions tests/infra/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ def __init__(
ignore_first_sigterm=False,
node_container_image=None,
follow_redirect=True,
max_uncommitted_tx_count=0,
**kwargs,
):
"""
Expand Down Expand Up @@ -793,6 +794,7 @@ def __init__(
ignore_first_sigterm=ignore_first_sigterm,
node_address=remote_class.get_node_address(node_address),
follow_redirect=follow_redirect,
max_uncommitted_tx_count=max_uncommitted_tx_count,
**kwargs,
)

Expand Down
1 change: 1 addition & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ pycose
fastparquet==2023.*
prettytable==3.*
polars
plotext

0 comments on commit d1d9d8e

Please sign in to comment.