A Cassandra cluster is typically composed of multiple hosts; the LoadBalancingPolicy (sometimes abbreviated LBP) is a central component that determines:
- which hosts the driver will communicate with;
- for each new query, which coordinator to pick, and which hosts to use as failover.
The policy is configured when initializing the cluster:
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withLoadBalancingPolicy(new RoundRobinPolicy())
.build();
Once the cluster has been built, you can't change the policy, but you may inspect it at runtime:
LoadBalancingPolicy lbp = cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
If you don't explicitly configure the policy, you get the default, which is a datacenter-aware, token-aware policy:
new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build());
Before we review the available implementations, we need to introduce two concepts:
For each host, the policy computes a distance that determines how the driver will establish connections to it:
LOCAL
andREMOTE
are "active" distances, meaning that the driver will keep open connections to the host. They differ in the number of connections opened, depending on your pooling options. Also, the control connection will favor local nodes if possible.IGNORED
, as the name suggests, means that the driver will not attempt to connect.
Typically, the distance will reflect network topology (e.g. local vs. remote datacenter), although that is entirely up to your policy. The distance can be dynamic: the driver re-checks it whenever connection pools are created (e.g. at startup or if a node was down and just came back up); you can also trigger it with refreshConnectedHosts:
// Re-evaluate all host distances:
cluster.getConfiguration().getPoolingOptions().refreshConnectedHosts();
// Re-evaluate the distance for a given host:
cluster.getConfiguration().getPoolingOptions().refreshConnectedHost(host);
Each time the driver executes a query, it asks the policy to compute a query plan, which is a list of hosts. The driver will then try each host in sequence, according to the retry policy and speculative execution policy.
The contents and order of query plans are entirely up to your policy, but implementations typically return plans that:
- are different for each query, in order to balance the load across the cluster;
- only contain hosts that are known to be able to process queries, i.e. neither ignored nor down;
- favor local hosts over remote ones.
The next sections describe the implementations that are provided with the driver.
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withLoadBalancingPolicy(new RoundRobinPolicy())
.build();
This is the most straightforward implementation. It returns query plans that include all hosts, and shift for each query in a round-robin fashion. For example:
- query 1: host1, host2, host3
- query 2: host2, host3, host1
- query 3: host3, host1, host2
- query 4: host1, host2, host3
- etc.
All hosts are at distance LOCAL
.
This works well for simple deployments. If you have multiple datacenters, it will be inefficient and you probably want to switch to a DC-aware policy.
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy.builder()
.withLocalDc("myLocalDC")
.build()
).build();
This policy queries nodes of the local data-center in a round-robin fashion.
Call withLocalDc
to specify the name of your local datacenter. You can also leave it out, and the driver will use the
datacenter of the first contact point that was reached at initialization. However,
remember that the driver shuffles the initial list of contact points, so this assumes that all contact points are in the
local datacenter. In general, providing the datacenter name explicitly is a safer option.
Hosts belonging to the local datacenter are at distance LOCAL
, and appear first in query plans (in a round-robin
fashion).
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withLoadBalancingPolicy(
RackAwareRoundRobinPolicy.builder()
.withLocalDc("myLocalDC")
.withLocalRack("myLocalRack")
.build()
).build();
This policy queries nodes of the local rack in a round-robin fashion.
Call withLocalDc
to specify the name of your local datacenter and withLocalRack
to specify the name of your local rack.
You can also leave it out, and the driver will use the datacenter and rack of the first contact point that was reached at initialization.
However, remember that the driver shuffles the initial list of contact points, so this assumes that all contact points are in the
local datacenter and rack. In general, providing the datacenter and rack name explicitly is a safer option.
Hosts belonging to the local datacenter are at distance LOCAL
, and appear first in query plans (in a round-robin
fashion) with hosts in the local rack having precedence over nodes in remote racks in the local datacenter.
For example, if there are any UP hosts in the local rack the policy will query those nodes in round-robin fashion:
- query 1: host1 (local DC, local rack), host2 (local DC, local rack), host3 (local DC, local rack)
- query 2: host2 (local DC, local rack), host3 (local DC, local rack), host1 (local DC, local rack)
- query 3: host3 (local DC, local rack), host1 (local DC, local rack), host2 (local DC, local rack)
- query 4: host1 (local DC, local rack), host2 (local DC, local rack), host3 (local DC, local rack)
If all nodes in the local rack are DOWN, the policy will then move on to remote racks in local DC:
- query 1: host4 (local DC, remote rack 1), host5 (local DC, remote rack 1), host6 (local DC, remote rack 2)
- query 2: host5 (local DC, remote rack 1), host6 (local DC, remote rack 2), host4 (local DC, remote rack 1)
- query 3: host6 (local DC, remote rack 2), host4 (local DC, remote rack 1), host5 (local DC, remote rack 1)
- query 4: host4 (local DC, remote rack 1), host5 (local DC, remote rack 1), host6 (local DC, remote rack 2)
If all nodes in the local datacenter are DOWN, the policy can query remote DCs (configurable by withUsedHostsPerRemoteDc
):
- query 1: host7 (remote DC, remote rack 1), host8 (remote DC, remote rack 1), host9 (remote DC, remote rack 2)
- query 2: host8 (remote DC, remote rack 1), host9 (remote DC, remote rack 2), host7 (remote DC, remote rack 1)
- query 3: host9 (remote DC, remote rack 2), host7 (remote DC, remote rack 1), host8 (remote DC, remote rack 1)
- query 4: host7 (remote DC, remote rack 1), host8 (remote DC, remote rack 1), host9 (remote DC, remote rack 2)
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withLoadBalancingPolicy(new TokenAwarePolicy(anotherPolicy))
.build();
This policy adds token awareness on top of another policy: requests will be routed in priority to the local replicas that own the data that is being queried.
In order for token awareness to work, you should first ensure that metadata is enabled in the driver. That is the case by default, unless it's been explicitly disabled by QueryOptions#setMetadataEnabled.
Then you need to consider whether routing information (provided by Statement#getKeyspace and Statement#getRoutingKey) can be computed automatically for your statements; if not, you may provide it yourself (if a statement has no routing information, the query will still be executed, but token awareness will not work, so the driver might not pick the best coordinator).
The examples assume the following CQL schema:
CREATE TABLE testKs.sensor_data(id int, year int, ts timestamp, data double,
PRIMARY KEY ((id, year), ts));
For simple statements, routing information can never be computed automatically:
SimpleStatement statement = new SimpleStatement(
"SELECT * FROM testKs.sensor_data WHERE id = 1 and year = 2016");
// No routing info available:
assert statement.getKeyspace() == null;
assert statement.getRoutingKey() == null;
// Set the keyspace manually:
statement.setKeyspace("testKs");
// Set the routing key manually: serialize each partition key component to its target CQL type
ProtocolVersion protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersionEnum();
statement.setRoutingKey(
TypeCodec.cint().serialize(1, protocolVersion),
TypeCodec.cint().serialize(2016, protocolVersion));
session.execute(statement);
For built statements, the keyspace is available if it was provided while building the query; the routing key is available only if the statement was built using the table metadata, and all components of the partition key appear in the query:
TableMetadata tableMetadata = cluster.getMetadata()
.getKeyspace("testKs")
.getTable("sensor_data");
// Built from metadata: all info available
BuiltStatement statement1 = select().from(tableMetadata)
.where(eq("id", 1))
.and(eq("year", 2016));
assert statement1.getKeyspace() != null;
assert statement1.getRoutingKey() != null;
// Built from keyspace and table name: only keyspace available
BuiltStatement statement2 = select().from("testKs", "sensor")
.where(eq("id", 1))
.and(eq("year", 2016));
assert statement2.getKeyspace() != null;
assert statement2.getRoutingKey() == null;
For bound statements, the keyspace is always available; the routing key is only available if all components of the partition key are bound as variables:
// All components bound: all info available
PreparedStatement pst1 = session.prepare("SELECT * FROM testKs.sensor_data WHERE id = :id and year = :year");
BoundStatement statement1 = pst1.bind(1, 2016);
assert statement1.getKeyspace() != null;
assert statement1.getRoutingKey() != null;
// 'id' hard-coded, only 'year' is bound: only keyspace available
PreparedStatement pst2 = session.prepare("SELECT * FROM testKs.sensor_data WHERE id = 1 and year = :year");
BoundStatement statement2 = pst2.bind(2016);
assert statement2.getKeyspace() != null;
assert statement2.getRoutingKey() == null;
For batch statements, the routing information of each child statement is inspected; the first non-null keyspace is used as the keyspace of the batch, and the first non-null routing key as its routing key (the idea is that all childs should have the same routing information, since batches are supposed to operate on a single partition). All children might have null information, in which case you need to provide the information manually as shown previously.
For any host, the distance returned by TokenAwarePolicy
is always the same as its child policy.
When the policy computes a query plan, it will first inspect the statement's routing information. If there is none, the policy simply acts as a pass-through, and returns the query plan computed by its child policy.
If the statement has routing information, the policy uses it to determine the replicas that hold the corresponding data. Then it returns a query plan containing:
- the replicas for which the child policy returns distance
LOCAL
, shuffled in a random order; - followed by the query plan of the child policy, skipping any host that were already returned by the previous step.
Finally, the shuffleReplicas
constructor parameter allows you to control whether the policy shuffles the replicas in
step 1:
new TokenAwarePolicy(anotherPolicy, false); // no shuffling
Shuffling will distribute writes better, and can alleviate hotspots caused by "fat" partitions. On the other hand,
setting it to false
might increase the effectiveness of caching, since data will always be retrieved from the
"primary" replica. Shuffling is enabled by default.
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withLoadBalancingPolicy(
LatencyAwarePolicy.builder(anotherPolicy)
.withExclusionThreshold(2.0)
.withScale(100, TimeUnit.MILLISECONDS)
.withRetryPeriod(10, TimeUnit.SECONDS)
.withUpdateRate(100, TimeUnit.MILLISECONDS)
.withMininumMeasurements(50)
.build()
).build();
This policy adds latency awareness on top of another policy: it collects the latencies of queries to each host, and will exclude the worst-performing hosts from query plans.
The builder allow you to customize various aspects of the policy:
- the exclusion threshold controls how much worse a host must perform (compared to the fastest host) in order to be excluded. For example, 2 means that hosts that are twice slower will be excluded;
- since a host's performance can vary over time, its score is computed with a time-weighted average; the scale controls how fast the weight given to older latencies decreases over time;
- the retry period is the duration for which a slow host will be penalized;
- the update rate defines how often the minimum average latency (i.e. the fastest host) is recomputed;
- the minimum measurements threshold guarantees that we have enough measurements before we start excluding a host. This prevents skewing the measurements during a node restart, where JVM warm-up will influence latencies.
For any host, the distance returned by the policy is always the same as its child policy.
Query plans are based on the child policy's, except that hosts that are currently excluded for being too slow are moved to the end of the plan.
WhiteListPolicy wraps another policy with a white list, to ensure that the driver will only ever connect to a
pre-defined subset of the cluster. The distance will be that of the child policy for hosts that are in the white list,
and IGNORED
otherwise. Query plans are guaranteed to only contain white-listed hosts.
HostFilterPolicy is a generalization of that concept, where you provide the predicate that will determine if a host is included or not.
If none of the provided policies fit your use case, you can write your own. This is an advanced topic, so we recommend
studying the existing implementations first: RoundRobinPolicy
is a good place to start, then you can look at more
complex ones like DCAwareRoundRobinPolicy
.