Reindexer supports async logical leader-follower replication and sync RAFT-cluster. Each node has to have unique server id, set via #config namespace or web interface. Maximum number of nodes in any replication setup is limited by server id value (max value is 999).
- Write ahead log (WAL)
- Data integrity check
- Async replication
- RAFT-cluster
- Migration from Reindexer's v3.x.x replication config
Write ahead log is combination of rows updates and statements execution. WAL is stored as part of namespace storage. Each WAL record has unique 64-bit log sequence number (LSN), which also contains server id. WAL is ring structure, therefore after N updates (1M by default) are recorded, oldest WAL records will be automatically removed. WAL in storage contains only records with statements (bulk updates, deletes and index structure changes). Rows updates are not stored as dedicated WAL records, but each document contains it's own LSN - this is enough to restore complete WAL in RAM on namespace loading from disk.
Side effect of this mechanic is lack of exact sequence of indexes updates/data updates, and therefore in case of incompatible data migration (e.g. indexed field type changed) follower will fail to apply offline WAL, and will fallback to forced sync
WAL overhead is 18 bytes of RAM per each row update record.
WAL size (maximum number of WAL records) may be configured via #config
namespace. For example to set first_namespace
's WAL size to 4000000 and second_namespace
's to 100000 this command may be used:
Reindexer> \upsert #config { "type": "namespaces", "namespaces": [ { "namespace":"first_namespace", "wal_size": 4000000 }, { "namespace":"second_namespace", "wal_size": 100000 } ] }
Default WAL size is 4000000
To view offline WAL contents from reindexer_tool SELECT
statement with special condition to #lsn
index is used:
Reindexer> SELECT * FROM epg LIMIT 2 where #lsn > 1000000
Replication is complex mechanism and there are present potential possibilities to broke data consistence between master and slave. Reindexer is calculates lightweight incremental hash of all namespace data (DataHash). DataHash is used to quick check, that data of follower is really up to date with leader.
In async replication setup Leader may be standalone server, or golang application with builtin or builtinserver bindings and Follower must be standalone server or golang application with builtinserver reindexer binding.
Replication is using 3 different mechanics:
-
Forced synchronization with namespace snapshot Used for initial synchronization for copy complete structure and data from leader namespace to folower or from one follower to another. Also used in case of error with WAL replication (e.g. WAL has been outdated, or incompatible changes in indexes structure). In this mode leader creates COW namespace snapshot sends all its indexes and data to follower.
-
Offline write ahead log (WAL). Document updates are ROW based, index structure changes, deletes and bulk updates are STATEMENT based Used when leader established network connection to follower to sync data. In this mode leader queries all required records from it's WAL with log sequence number (LSN) greater than LSN of applied by follower's last WAL record.
-
Online WAL updates live stream Used when connection is established. Leader pushes WAL updates stream to all connected followers. This mode is most lightweight and requires less leader's CPU and memory resources.
Cascade replication setups are also supported. In those setups only one leader exists, however, followers may have there own followers to:
leader
/ \
follower1 follower2
| / \
follower1.1 follower2.1 follower2.2
In example above follower1 and follower2 replicates data to other followers, but in comparison with leader, they don't permit write access for external clients.
Replication has to be confiured on leader's side by using special documents of namespace #config
.
First of all general replication parameters must be set via replication
item (those parameters are common for both async and cluster replication):
{
"type":"replication",
"replication":{
"server_id": 0,
"cluster_id": 2,
}
}
server_id
- Server ID, unique ID of the node (must be set by user)cluster_id
- Cluster ID, must be same on each node in cluster
Then you are able to configure specific async replication via async_replication
item:
{
"type":"async_replication",
"async_replication":{
"role":"none",
"replication_mode":"default",
"sync_threads": 4,
"syncs_per_thread": 2,
"online_updates_timeout_sec": 20,
"online_updates_delay_msec": 100,
"sync_timeout_sec": 60,
"retry_sync_interval_msec": 30000,
"enable_compression": true,
"batching_routines_count": 100,
"force_sync_on_logic_error": false,
"force_sync_on_wrong_data_hash": false,
"log_level":"info",
"max_wal_depth_on_force_sync": 1000,
"namespaces":[],
"nodes":
[
{
"dsn": "cproto://192.168.1.5:6534/mydb",
"namespaces": []
}
]
}
}
role
Replication role. May be on ofnone
- replication is disabledfollower
- replication as followerleader
- replication as leader
replication_mode
- Replication mode. Allows to configure async replication from sync raft-cluster. May be one ofdefault
- async replication from this node is always enabled, if there are any target nodes to replicate on;from_sync_leader
- async replication will be enabled only when current node is synchronous RAFT-cluster leader (or if this node does not have any sync cluster config)
sync_threads
- Number of replication threadsyncs_per_thread
- Max number of concurrent force/wal-syncs per each replicatio threadonline_updates_timeout_sec
- Network timeout for communication with followers (for online-replication mode), in secondssync_timeout_sec
- Network timeout for communication with followers (for force and wal synchronization), in secondsretry_sync_interval_msec
- Synchronization retry delay in case of any errors during online replicationenable_compression
- Network traffic compression flagbatching_routines_count
- Number of concurrent routines, used to asynchronously send online updates for each each follower. Larger values may reduce network triparound, but also raise RAM consumationforce_sync_on_logic_error
- Force resync on logic error conditionsforce_sync_on_wrong_data_hash
- Force resync if dataHash mismatchlog_level
- Replication log level on replicator's startup. Possible values: none, error, warning, info, tracemax_wal_depth_on_force_sync
- Maximum number of WAL records, which will be copied after force-synconline_updates_delay_msec
- Delay between write operation and replication. Larger values here will leader to higher replication latency and bufferization, but also will provide more effective network batching and CPU untilizationnamespaces
- List of namespaces for replication. If empty, all namespaces. All replicated namespaces will become read only for follower. This parameter is used, when node doesn't have specific namespace listnodes
- List of follower nodes
Follower settings:
ip_addr
- IP of the follower noderpc_port
- Port of follower's RPC-serverdatabase
- Follower database namenamespaces
- Optional list on namespaces to replicate to this specific node. If specific list is not set, value from general replication config will be used
As second option replication can be configured by YAML-config files, which has to be placed to database directory. Sample of async replication config file is here and sample for general replication config file is here.
If config file is present, then it's overrides settings from #config
namespace on reindexer startup.
Replication status is available in system namespace #memstats
. e.g, execution of statament:
Reindexer> SELECT name,replication FROM #memstats WHERE name='media_items'
will return JSON object with status of namespace media_items
replication
{
"last_lsn_v2":{
"server_id":20,
"counter":5
},
"ns_version":{
"server_id":20,
"counter":5
},
"clusterization_status":{
"leader_id": 20,
"role": "simple_replica"
}
"temporary":false,
"incarnation_counter":0,
"data_hash":6,
"data_count":4,
"updated_unix_nano":1594041127153561000,
"status":"none",
"wal_count":6,
"wal_size":311
}
last_lsn_v2
- current lsn of this node (leader and follower will have the same LSN values for each record)ns_version
- current namespace version. This value is set during namespace creationtemporary
- namespace is temporaryincarnation_counter
- number of switches between master slavedata_hash
- hash of datadata_count
- number of records in the namespaceupdated_unix_nano
- last operation timestatus
- shows current synchronization status: idle, error, syncing, fatalwal_count
- number of records in WALwal_size
- WAL size
There are also a few actions available to interact with async repliication via reindexer tool or web interface.
- Restart replication (if sent to leader, restarts replication completely):
Reindexer> \upsert #config { "type":"action","action":{ "command":"restart_replication" } }
- Reset replication role (useful to make follower's namespaces writable again, if you are going to use this follower outside of replication setup):
Reindexer> \upsert #config { "type":"action","action":{ "command":"reset_replication_role" } }
- Reset replication role for specific namespace:
Reindexer> \upsert #config { "type":"action","action":{ "command":"reset_replication_role", "namespace": "ns_name" } }
- Control replication's log levels:
Reindexer> \upsert #config { "type":"action","action":{ "command":"set_log_level", "type": "async_replication", "level":"trace" } }
Possible types
: async_replication
(controls log level for asynchronous replication), cluster
(controls log level for synchronous replication and RAFT-manager).
Possible levels
: none
, error
, warning
, info
, trace
.
Reindexer support RAFT-like algorithm for leader elections and synchronious replication with consesus awaiting. In cluster setup each node may be standalone server or golang application with builtinserver binding.
While running in cluster mode node may have one of 3 roles: Leader, Follower or Candidate. The role of each node is dynamic and defined by election algorithm.
At startup each node begins an elections loop. Each elections iteration (elections term) has following steps:
- Switch role to Candidate;
- Increment elections term;
- Try to suggest itself as Leader for rest of the nodes;
- In case if this node reaches consensus, it becomes Leader and all other nodes become followers;
- Otherwise await pings/leader suggestions for randomized period of time;
- If leader's ping was recieved, node becomes follower.
Once node has became follower it starts checking if leader is available (via timestamp of the last received ping message). If leader becomes unavailable, follower initiate new election term.
In follower state node changes roles of each namespace from cluster config to "cluster_replica" and sets corresponding leader ID (i.e. server ID of current leader). From this moment all of the follower namespaces are readonly for anyone except cluser leader.
Any request, which requires write access will be proxied to leader and executed on the leader's side.
Requests, which doesn't require write access (and request to system namespaces) will be executed localy.
Once node has became leader it starts sending ping messages to all of the followers. If follower did not respond to the ping message, it will be considered as dead. If N/2 of the followers are dead (N is total number of nodes in cluster), leader has to initiate new elections term.
Right after role switch node begins initial leader sync. During initial sync leader collects latest data from at least N/2 other nodes. It can not replicate any data while initial sync is not completed.
When initial sync is done, leader start to synchronize followers. Same as for asynchronizous replication there are 3 different mechanics for data synchronization: force sync, WAL sync and online updates. Leader continues to replicate data, while it won't recieve request for manual re-elections or error in consensus. In both of this situations node will initiate new elections term.
- Consensus for each data write.
Reindexer is using consensus algorithm to provide data safety. Each data write on leader generates one or few online-updates and then each update awaits approves from N/2 followers. Follower can not approve updates, if WAL/force sync is not done yet.
- Read after write is always safe.
If operation was proxied from follower to leader, then it won't be generaly approved before it get approve from current node (emmiter node), so if you wrote something in current thread on any node, then you'll be able to read this data from the same node.
- Online updates are ordered.
For optimization purposes concurrent writes are available even if some of the writing operations from other threads are awaiting consensus right now. Reindexer guarantees that all of this concurrent writes will be performed on followers with the same ordering.
On startup reindexer_server reads replication and cluster config from files replication.conf
(sample) and cluster.conf
(sample), which have to be placed in database directory.
replication.conf
sets general replication parameter and has to be unique for each node in cluster (those parameters also may be configured via #config
namespace).
cluster.conf
sets specific cluster parameters (description may be found in sample). This file has to have the same content on each node of the cluster.
-
Example script, which creates local cluster.
-
Docker-compose config, which create 3 docker-containers running RAFT-cluster. Usage:
# Run from docker-compose.yml directory
docker-compose up
In both examples default RPC ports are: 6534, 6535, 6536; and default HTTP ports are: 9088, 9089, 9090.
// work in progress
Cluster statistics may be recieved via select-request to #replicationstats
namespace:
Reindexer> SELECT * FROM #replicationstats where type="cluster"
This select has to have filter by type
field with either cluster
or async
value. When type="cluster"
is used, request will be proxied to leader.
Example of JSON-response:
{
"type": "cluster",
"initial_sync": {
"force_syncs": {
"count": 0,
"max_time_us": 0,
"avg_time_us": 0
},
"wal_syncs": {
"count": 0,
"max_time_us": 0,
"avg_time_us": 0
},
"total_time_us": 2806
},
"force_syncs": {
"count": 0,
"max_time_us": 0,
"avg_time_us": 0
},
"wal_syncs": {
"count": 0,
"max_time_us": 0,
"avg_time_us": 0
},
"update_drops": 0,
"pending_updates_count": 275,
"allocated_updates_count": 275,
"allocated_updates_size": 43288,
"nodes": [
{
"dsn": "cproto://127.0.0.1:14000/node0",
"server_id": 0,
"pending_updates_count": 60,
"status": "offline",
"sync_state": "awaiting_resync",
"role": "none",
"is_synchronized": false,
"namespaces": []
},
{
"dsn": "cproto://127.0.0.1:14002/node2",
"server_id": 2,
"pending_updates_count": 0,
"status": "online",
"sync_state": "online_replication",
"role": "follower",
"is_synchronized": true,
"namespaces": []
},
{
"dsn": "cproto://127.0.0.1:14003/node3",
"server_id": 3,
"pending_updates_count": 0,
"status": "online",
"sync_state": "online_replication",
"role": "follower",
"is_synchronized": true,
"namespaces": []
},
{
"dsn": "cproto://127.0.0.1:14004/node4",
"server_id": 4,
"pending_updates_count": 275,
"status": "online",
"sync_state": "online_replication",
"role": "follower",
"is_synchronized": true,
"namespaces": []
},
{
"dsn": "cproto://127.0.0.1:14001/node1",
"server_id": 1,
"pending_updates_count": 0,
"status": "online",
"sync_state": "online_replication",
"role": "leader",
"is_synchronized": true,
"namespaces": []
}
]
}
initial_sync
- statistics about leader's initial sync;force_syncs
- statistics about follower's namespaces force syncs;wal_syncs
- statistics about follower's namespaces wal syncs;update_drops
- number of updates overflows, when updates were dropped;pending_updates_count
- number of updates, awaiting replication in queue;allocated_updates_count
- number of updates in queue (including those, which already were replicated, but was not deallocated yet);allocated_updates_size
- total size of allocated updates in bytes;nodes
- statistics about each node.
Node statistics fields:
dsn
- node's DSN;server_id
- node's server ID;pending_updates_count
- number of updates, which are awating replication to this sepcific follower;status
- node's status:none
,online
,offline
orraft_error
;sync_state
- node's replication sync state:none
,online_replication
,awaiting_resync
,syncing
orinitial_leader_sync
;role
- node's role:leader
orfollower
;is_synchronized
- synchronization status. Shows if all of the approved updates were replicated to this node;namespaces
- namespaces which are configured for this node.
Leader of the cluster may be changed manually via #config
namespace. For example, this request will transfer leadership to the node with server ID
2 (if it exists):
Reindexer> \upsert #config { "type":"action","action":{ "command":"set_leader_node", "server_id": 2 } }
It's possible to combine async replication and RAFT-cluster in setups like this:
cluster1 (ns1, ns2) cluster2 (ns1)
updates -> cl10 - cl11 cl20 - cl21
\ / async repl(ns2) \ /
cl12 -------------------> cl22
In setup above there are 2 independant RAFT-clusters: cluster1
(over ns1
and ns2
) and cluster2
(over ns1
). Also one of the nodes of the first cluster replicating its data (ns2
) asynchronously to one of the nodes of the second cluster.
Take a notice:
ns2
can not taking part in second RAFT-cluster- asynchronous replication here (with
default
replication mode) still works on node-to-node basis, i.e. it replicates all of the data fromcl12
node tocl22
node, but not to other nodes of the seconds cluster (i.e. ifcl12
is down, data will not be asynchronously replicated)
It's possible to combine async replication and RAFT-cluster in setups like this:
cluster1 (ns1, ns2) cluster2 (ns1)
updates -> cl10 - cl11 async repl(ns2) cl20 - cl21
\ / -------------------> \ /
cl12 cl22
In setup above there are 2 independant RAFT-clusters: cluster1
(over ns1
and ns2
) and cluster2
(over ns1
). Also each of the nodes of the first cluster has the same async replication config like this:
{
"type":"async_replication",
"async_replication":{
"role":"leader",
"replication_mode":"from_sync_leader",
"sync_threads": 4,
"syncs_per_thread": 2,
"online_updates_timeout_sec": 20,
"sync_timeout_sec": 60,
"retry_sync_interval_msec": 30000,
"enable_compression": true,
"batching_routines_count": 100,
"force_sync_on_logic_error": false,
"force_sync_on_wrong_data_hash": false,
"max_wal_depth_on_force_sync": 1000,
"nodes":
[
{
"dsn": "cproto://192.168.1.5:6534/mydb",
"namespaces": ["ns2"],
"replication_mode":"from_sync_leader"
},
{
"dsn": "cproto://192.168.1.6:6534/mydb",
"namespaces": ["ns2"],
"replication_mode":"from_sync_leader"
},
{
"dsn": "cproto://192.168.1.7:6534/mydb",
"namespaces": ["ns2"],
"replication_mode":"from_sync_leader"
}
]
}
}
With replication_mode: "from_sync_leader"
option only the current leader of cluster1
replicating its data (ns2
) asynchronously to all the nodes of the second cluster (i.e. if one of the node from cluster1
down, asynchronous replication will still work via new leader)
Take a notice:
ns2
can not taking part in second RAFT-cluster
- Reindexer does not have global WAL and uses individual WAL for each namespace. Due to this peculiarity operations like
rename namespace
anddrop namespace
can not be replicated properly via force/WAL sync (only online replication is available). This also may lead to situation, when new leader recovers some of the dropped namespaces on initial sync. - Reindexer does not removing 'extra' namespaces from replicas and does not downloadin those namespaces to leader after it's initial sync. So this may lead to the situation, when some replicas will have namespaces, which does not exist anywhere in cluster (in case if those replicas were offline during leader elections). For example, if you have cluster of 3 nodes, where two nodes are empty and one has actual data. In that case two empty nodes will startup faster and will be able to choose the leader, while third node is still reading its data (and stays offline).
- If server ID in cluster config does not correspond to actual server ID of the node, this may lead to undefined behaviour (currently reindexer does not validate those IDs by itself).
- Currently synchronous cluster config may not be changed in runtime via HTTP/RPC and rewriting YML-file is the only way to change it.
New asynchronous replication is incompatible with configs from v3.x.x, so you will have to migrate manually.
- Server ID's and cluster ID's doesn't require any changes (they are using the same object "replication" in #config namespace)
- Current scheme works as "push-replication" and legacy scheme was "pull-replication", so you have to move all information about namespaces and DSNs from leagcy slaves to current leader (to "async_replication" in #config).
- Any additional configs (i.e. timeouts or appnames) now should also be configured on leader's side.
For example, we have the following replication scheme:
leader(192.168.10.1)
/ \
follower1(192.168.10.2) follower2(192.168.10.3)
In this case follower1
legacy replication config is:
{
"type":"replication",
"replication":{
"role":"slave",
"master_dsn":"cproto://192.168.10.1/db",
"cluster_id":2,
"server_id":1,
"force_sync_on_logic_error": false,
"force_sync_on_wrong_data_hash": false,
"namespaces":
- "ns1"
}
}
follower2
config is:
{
"type":"replication",
"replication":{
"role":"slave",
"master_dsn":"cproto://192.168.10.1/db",
"cluster_id":2,
"server_id":2,
"force_sync_on_logic_error": false,
"force_sync_on_wrong_data_hash": false,
"namespaces":
- "ns2"
- "ns3"
}
}
And leader
config is:
{
"type":"replication",
"replication":{
"role":"master",
"master_dsn":"",
"cluster_id":2,
"server_id":0
}
}
First of all depreacted parameters must be removed from old followers' configs.
New follower1
config is:
{
"type":"replication",
"replication":{
"cluster_id":2,
"server_id":1
}
}
New follower2
config:
{
"type":"replication",
"replication":{
"cluster_id":2,
"server_id":2
}
}
Also follower-role should be set for both follower1
and follower2
:
{
"type":"async_replication",
"async_replication":{
"role":"follower"
}
}
Then remove deprecated fields from leader
's replication config:
{
"type":"replication",
"replication":{
"cluster_id":2,
"server_id":0
}
}
And finally create new async replication config for leader
:
{
"type":"async_replication",
"async_replication":{
"role":"leader",
"sync_threads": 4,
"syncs_per_thread": 2,
"online_updates_timeout_sec": 60,
"retry_sync_interval_msec": 30000,
"enable_compression": true,
"batching_routines_count": 100,
"force_sync_on_logic_error": false,
"force_sync_on_wrong_data_hash": false,
"namespaces":[],
"nodes":
[
{
"dsn": "cproto://192.168.10.2:6534/mydb",
"namespaces": ["ns1"]
},
{
"dsn": "cproto://192.168.10.3:6534/mydb",
"namespaces": ["ns2","ns3"]
}
]
}
}
Migration for cascade repliction setup doesn't have much differences from migration for any other setups. Initial scheme is:
leader(192.168.10.1)
|
follower1(192.168.10.2)
|
follower2(192.168.10.3)
In this case follower1
legacy replication config is:
{
"type":"replication",
"replication":{
"role":"slave",
"master_dsn":"cproto://192.168.10.1/db",
"cluster_id":2,
"server_id":1,
"force_sync_on_logic_error": false,
"force_sync_on_wrong_data_hash": false,
"namespaces": []
}
}
follower2
config is:
{
"type":"replication",
"replication":{
"role":"slave",
"master_dsn":"cproto://192.168.10.2/db",
"cluster_id":2,
"server_id":2,
"force_sync_on_logic_error": false,
"force_sync_on_wrong_data_hash": false,
"namespaces": []
}
}
And leader
config is:
{
"type":"replication",
"replication":{
"role":"master",
"master_dsn":"",
"cluster_id":2,
"server_id":0
}
}
Same as for simple single-level config, all depreacted parameters must be removed from old followers' configs.
New follower1
config is:
{
"type":"replication",
"replication":{
"cluster_id":2,
"server_id":1
}
}
New follower2
config:
{
"type":"replication",
"replication":{
"cluster_id":2,
"server_id":2
}
}
Then follower-role should be set for follower2
:
{
"type":"async_replication",
"async_replication":{
"role":"follower"
}
}
follower1
in this case will still have follower-role, hovewer it also gets nodes
config to replicate data:
{
"type":"async_replication",
"async_replication":{
"role":"leader",
"sync_threads": 4,
"syncs_per_thread": 2,
"online_updates_timeout_sec": 60,
"retry_sync_interval_msec": 30000,
"enable_compression": true,
"batching_routines_count": 100,
"force_sync_on_logic_error": false,
"force_sync_on_wrong_data_hash": false,
"namespaces":[],
"nodes":
[
{
"dsn": "cproto://192.168.10.3:6534/mydb",
}
]
}
}
Note, that there were no specific namepspaces' list for the followers, so there is no sucn field in nodes
config (field namespaces
from top level is used instead).
When followers configuration is done, remove deprecated fields from leader
's replication config:
{
"type":"replication",
"replication":{
"cluster_id":2,
"server_id":0
}
}
And finally create new async replication config for leader
(it looks like similar to follower1
config, however with different role and dsn):
{
"type":"async_replication",
"async_replication":{
"role":"leader",
"sync_threads": 4,
"syncs_per_thread": 2,
"online_updates_timeout_sec": 60,
"retry_sync_interval_msec": 30000,
"enable_compression": true,
"batching_routines_count": 100,
"force_sync_on_logic_error": false,
"force_sync_on_wrong_data_hash": false,
"namespaces":[],
"nodes":
[
{
"dsn": "cproto://192.168.10.2:6534/mydb"
}
]
}
}