Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic backpressure mechanism if primary has too many in-flight transactions #5692

Merged
merged 14 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [5.0.0-dev3]

[5.0.0-dev3]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev3

- Added a `consensus.max_uncommitted_tx_count` configuration option, which specifies the maximum number of transactions that can be pending on the primary. When that threshold is exceeded, a `503 Service Unavailable` is temporarily returned on all but the `/node/*` paths (#5692).

## [5.0.0-dev2]

[5.0.0-dev2]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev2
Expand Down
5 changes: 5 additions & 0 deletions doc/host_config_schema/cchost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,11 @@
"type": "string",
"default": "5000ms",
"description": "Maximum timeout (time string) after which backup nodes that have not received any message from the primary node (or voted for a candidate) will trigger a new election. This timeout is also used by candidates to restart unsuccessful elections. This should be set to a significantly greater value than 'message_timeout' plus the expected network delay"
},
"max_uncommitted_tx_count": {
"type": "integer",
"default": 10000,
"description": "Maximum number of uncommitted transactions allowed before the primary refuses new transactions. Unlimited if set to 0."
}
},
"description": "This section includes configuration for the consensus protocol (note: should be the same for all other nodes in the service)",
Expand Down
5 changes: 5 additions & 0 deletions include/ccf/endpoint_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,5 +285,10 @@ namespace ccf::endpoints
const EndpointDefinitionPtr& endpoint);
virtual void increment_metrics_retries(
const EndpointDefinitionPtr& endpoint);

virtual bool apply_uncommitted_tx_backpressure() const
{
return true;
}
};
}
1 change: 1 addition & 0 deletions include/ccf/odata_error.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ namespace ccf
ERROR(ProposalCreatedTooLongAgo)
ERROR(InvalidCreatedAt)
ERROR(JSException)
ERROR(TooManyPendingTransactions)
ERROR(MissingApiVersionParameter)
ERROR(UnsupportedApiVersionValue)

Expand Down
1 change: 1 addition & 0 deletions include/ccf/service/consensus_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace consensus
{
ds::TimeString message_timeout = {"100ms"};
ds::TimeString election_timeout = {"5000ms"};
size_t max_uncommitted_tx_count = 10000;

bool operator==(const Configuration&) const = default;
bool operator!=(const Configuration&) const = default;
Expand Down
18 changes: 18 additions & 0 deletions src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ namespace aft
// Timeouts
std::chrono::milliseconds request_timeout;
std::chrono::milliseconds election_timeout;
size_t max_uncommitted_tx_count;
bool ticking = false;

// Configurations
Expand Down Expand Up @@ -212,6 +213,7 @@ namespace aft

request_timeout(settings_.message_timeout),
election_timeout(settings_.election_timeout),
max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),

reconfiguration_type(reconfiguration_type_),
node_client(rpc_request_context_),
Expand Down Expand Up @@ -255,6 +257,22 @@ namespace aft
return can_replicate_unsafe();
}

/**
* Returns true if the node is primary, max_uncommitted_tx_count is non-zero
* and the number of transactions replicated but not yet committed exceeds
* max_uncommitted_tx_count.
*/
bool is_at_max_capacity() override
{
if (max_uncommitted_tx_count == 0)
{
return false;
}
std::unique_lock<ccf::pal::Mutex> guard(state->lock);
return state->leadership_state == kv::LeadershipState::Leader &&
(state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
}

Consensus::SignatureDisposition get_signature_disposition() override
{
std::unique_lock<ccf::pal::Mutex> guard(state->lock);
Expand Down
3 changes: 2 additions & 1 deletion src/consensus/aft/test/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ static std::vector<uint8_t> cert;

static const ds::TimeString request_timeout_ = {"10ms"};
static const ds::TimeString election_timeout_ = {"100ms"};
static const size_t max_uncommitted_tx_count_ = 0;

static const std::chrono::milliseconds request_timeout = request_timeout_;
static const std::chrono::milliseconds election_timeout = election_timeout_;

static const consensus::Configuration raft_settings{
request_timeout_, election_timeout_};
request_timeout_, election_timeout_, max_uncommitted_tx_count_};

static auto hooks = std::make_shared<kv::ConsensusHookPtrs>();

Expand Down
2 changes: 1 addition & 1 deletion src/consensus/consensus_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace consensus
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(Configuration);
DECLARE_JSON_REQUIRED_FIELDS(Configuration);
DECLARE_JSON_OPTIONAL_FIELDS(
Configuration, message_timeout, election_timeout);
Configuration, message_timeout, election_timeout, max_uncommitted_tx_count);
achamayou marked this conversation as resolved.
Show resolved Hide resolved

#pragma pack(push, 1)
template <typename T>
Expand Down
1 change: 1 addition & 0 deletions src/kv/kv_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ namespace kv
virtual bool is_backup() = 0;
virtual bool is_candidate() = 0;
virtual bool can_replicate() = 0;
virtual bool is_at_max_capacity() = 0;

enum class SignatureDisposition
{
Expand Down
5 changes: 5 additions & 0 deletions src/kv/test/stub_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ namespace kv::test
return state == Primary;
}

virtual bool is_at_max_capacity() override
{
return false;
}

virtual Consensus::SignatureDisposition get_signature_disposition() override
{
if (state == Primary)
Expand Down
14 changes: 14 additions & 0 deletions src/node/rpc/frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,20 @@ namespace ccf

while (attempts < max_attempts)
{
if (consensus != nullptr)
{
if (
endpoints.apply_uncommitted_tx_backpressure() &&
consensus->is_at_max_capacity())
{
ctx->set_error(
HTTP_STATUS_SERVICE_UNAVAILABLE,
ccf::errors::TooManyPendingTransactions,
"Too many transactions pending commit on the service.");
return;
}
}

std::unique_ptr<kv::CommittableTx> tx_p = tables.create_tx_ptr();
set_root_on_proposals(*ctx, *tx_p);

Expand Down
8 changes: 8 additions & 0 deletions src/node/rpc/node_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ namespace ccf

class NodeEndpoints : public CommonEndpointRegistry
{
public:
// The node frontend is exempt from backpressure rules to enable an operator
// to access a node that is not making progress.
bool apply_uncommitted_tx_backpressure() const override
{
return false;
}

private:
NetworkState& network;
ccf::AbstractNodeOperation& node_operation;
Expand Down
3 changes: 2 additions & 1 deletion tests/config.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@
"consensus":
{
"message_timeout": "{{ message_timeout }}",
"election_timeout": "{{ election_timeout }}"
"election_timeout": "{{ election_timeout }}",
"max_uncommitted_tx_count": {{ max_uncommitted_tx_count|tojson or 0 }}
},
"ledger_signatures":
{
Expand Down
42 changes: 42 additions & 0 deletions tests/e2e_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,49 @@ def run_pid_file_check(args):
network.ignoring_shutdown_errors = True


def run_max_uncommitted_tx_count(args):
with infra.network.network(
["local://localhost", "local://localhost"],
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
pdb=args.pdb,
) as network:
uncommitted_cap = 20
network.per_node_args_override[0] = {
"max_uncommitted_tx_count": uncommitted_cap
}
network.start_and_open(args)
LOG.info(
f"Start network with max_uncommitted_tx_count set to {uncommitted_cap}"
)
# Stop the backup node, to freeze commit
primary, backups = network.find_nodes()
backups[0].stop()
unavailable_count = 0
last_accepted_index = 0

with primary.client(identity="user0") as c:
for idx in range(uncommitted_cap + 1):
r = c.post(
"/app/log/public?scope=test_large_snapshot",
body={"id": idx, "msg": "X" * 42},
log_capture=[],
)
if r.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE:
unavailable_count += 1
if last_accepted_index == 0:
last_accepted_index = idx - 1
LOG.info(f"Last accepted: {last_accepted_index}, {unavailable_count} 503s")
assert unavailable_count > 0, "Expected at least one SERVICE_UNAVAILABLE"

with primary.client() as c:
r = c.get("/node/network")
assert r.status_code == http.HTTPStatus.OK.value, r


def run(args):
run_max_uncommitted_tx_count(args)
run_file_operations(args)
run_tls_san_checks(args)
run_config_timeout_check(args)
Expand Down
71 changes: 56 additions & 15 deletions tests/infra/basicperf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shutil
import datetime
import ccf.ledger
import plotext as plt


def configure_remote_client(args, client_id, client_host, common_dir):
Expand Down Expand Up @@ -421,12 +422,14 @@ def table():
requestSize=pl.col("request").map_elements(len),
responseSize=pl.col("rawResponse").map_elements(len),
)
# 50x are expected when we stop the primary, 500 when we drop the session
# to maintain consistency, and 504 when we try to write to the future primary
# before their election. Since these requests effectively do nothing, they
# should not count towards latency statistics.
# if args.stop_primary_after_s:
# overall = overall.filter(pl.col("responseStatus") < 500)

number_of_errors = overall.filter(
pl.col("responseStatus") >= 500
).height
total_number_of_requests = overall.height
print(
f"Errors: {number_of_errors} ({number_of_errors / total_number_of_requests * 100:.2f}%)"
)

overall = overall.with_columns(
pl.col("receiveTime").alias("latency") - pl.col("sendTime")
Expand All @@ -449,6 +452,13 @@ def table():
agg = pl.concat(agg, rechunk=True)
LOG.info("Aggregate results")
print(agg)

number_of_errors = agg.filter(pl.col("responseStatus") >= 500).height
total_number_of_requests = agg.height
print(
f"Errors: {number_of_errors} ({number_of_errors / total_number_of_requests * 100:.2f}%)"
)

agg_path = os.path.join(
network.common_dir, "aggregated_basicperf_output.parquet"
)
Expand Down Expand Up @@ -535,17 +545,48 @@ def table():
.count()
.rename({"count": "rcvd"})
)
errors_per_sec = (
agg.with_columns(
(
(pl.col("receiveTime").alias("second") - start_send)
/ 1000000
).cast(pl.Int64)
)
.filter(pl.col("responseStatus") >= 500)
.group_by("second")
.count()
.rename({"count": "errors"})
)

per_sec = sent_per_sec.join(recv_per_sec, on="second").sort("second")
print(per_sec)
per_sec = per_sec.with_columns(
sent_rate=pl.col("sent") / per_sec["sent"].max(),
rcvd_rate=pl.col("rcvd") / per_sec["rcvd"].max(),
per_sec = (
sent_per_sec.join(recv_per_sec, on="second")
.join(errors_per_sec, on="second", how="outer")
.sort("second")
.fill_null(0)
)
for row in per_sec.iter_rows(named=True):
s = "S" * int(row["sent_rate"] * 20)
r = "R" * int(row["rcvd_rate"] * 20)
print(f"{row['second']:>3}: {s:>20}|{r:<20}")

plt.simple_bar(
list(per_sec["second"]),
list(per_sec["sent"]),
width=100,
title="Sent requests per second",
)
plt.show()

plt.simple_stacked_bar(
list(per_sec["second"]),
[list(per_sec["rcvd"]), list(per_sec["errors"])],
width=100,
labels=["rcvd", "errors"],
colors=["green", "red"],
title="Received requests per second",
)
plt.show()

if number_of_errors and not args.stop_primary_after_s:
raise RuntimeError(
f"Errors: {number_of_errors} ({number_of_errors / total_number_of_requests * 100:.2f}%)"
)

with cimetrics.upload.metrics(complete=False) as metrics:
LOG.success("Uploading results")
Expand Down
2 changes: 2 additions & 0 deletions tests/infra/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ def __init__(
ignore_first_sigterm=False,
node_container_image=None,
follow_redirect=True,
max_uncommitted_tx_count=0,
**kwargs,
):
"""
Expand Down Expand Up @@ -793,6 +794,7 @@ def __init__(
ignore_first_sigterm=ignore_first_sigterm,
node_address=remote_class.get_node_address(node_address),
follow_redirect=follow_redirect,
max_uncommitted_tx_count=max_uncommitted_tx_count,
**kwargs,
)

Expand Down
1 change: 1 addition & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ pycose
fastparquet==2023.*
prettytable==3.*
polars
plotext