From b2d6f1ed9cb126fdb36a263df40cda773e9452e1 Mon Sep 17 00:00:00 2001 From: Chris Hoffman Date: Tue, 19 Apr 2016 11:24:22 -0700 Subject: [PATCH 1/3] Add cluster_setup.py for BMS-Specific turn-up and maintenance. --- docs/cluster-setup.rst | 5 + examples/query_cluster.py | 20 ++++ rediscluster/cluster_setup.py | 202 ++++++++++++++++++++++++++++++++++ rediscluster/exceptions.py | 6 + rediscluster/nodemanager.py | 7 +- 5 files changed, 239 insertions(+), 1 deletion(-) create mode 100644 examples/query_cluster.py create mode 100644 rediscluster/cluster_setup.py diff --git a/docs/cluster-setup.rst b/docs/cluster-setup.rst index 987bbd58..ab513d95 100644 --- a/docs/cluster-setup.rst +++ b/docs/cluster-setup.rst @@ -1,6 +1,11 @@ Redis cluster setup =================== +Tasks: +- given a set of N redis nodes, create a new cluster with p masters and q slaves, where p+q=N +- given a working cluster, add p masters, rebalance the cluster nodes, and add q nodes +- given a working cluster, remove either a given node_id, OR all nodes belonging to a specific host + Manually diff --git a/examples/query_cluster.py b/examples/query_cluster.py new file mode 100644 index 00000000..b7424c9d --- /dev/null +++ b/examples/query_cluster.py @@ -0,0 +1,20 @@ +from rediscluster.cluster_mgt import RedisClusterMgt +from rediscluster import StrictRedisCluster + + +def main(startup_nodes): + + # Note: decode_responses must be set to True when used with python3 + rc = StrictRedisCluster(startup_nodes=startup_nodes, decode_responses=True) + resp = rc.cluster_info() + + +def main1(startup_nodes): + + r = RedisClusterMgt(startup_nodes) + resp = r.slots() + print resp + +if __name__ == "__main__": + startup_nodes = [{"host": "10.64.1.102", "port": "7000"}] + main(startup_nodes) diff --git a/rediscluster/cluster_setup.py b/rediscluster/cluster_setup.py new file mode 100644 index 00000000..8965b498 --- /dev/null +++ b/rediscluster/cluster_setup.py @@ -0,0 +1,202 @@ +#!/usr/bin/python +from rediscluster.exceptions import RedisClusterUninitialized +from rediscluster import StrictRedisCluster +from redis import StrictRedis +import redistrib.command + +NUM_SLOTS = 16384 + + +def make_parts(n): + p = NUM_SLOTS/n + remain = NUM_SLOTS-(p*n) + + partitions = [] + i = 0 + count = 0 + s = remain + while count < n: + q = i + r = i + p - 1 + if s: + r += 1 + s -= 1 + partitions.append((q, r)) + i = (r + 1) + count += 1 + return partitions + + +def create(startup_nodes, replicas=0): + """ + Function to create a new cluster ONLY from nodes not already initialized. NOTE: this function replicates + redis-trib.rb 'create' command, EXCEPT that it can take less than 3 nodes for initialization. + """ + + nodeset = dict() + new_nodes = [] + for node in startup_nodes: + try: + rc = StrictRedisCluster(startup_nodes=[node], decode_responses=True) + cluster_nodes = rc.cluster_nodes() + for n in cluster_nodes: + nodeset.update({n['id']: n}) + except RedisClusterUninitialized: + new_nodes.append(node) + + if nodeset: + print 'nodes already in a cluster:' + for n in nodeset: + print nodeset[n] + + if not new_nodes: + print "no nodes available to be in a cluster" + return + + print 'nodes to make a new cluster' + node_list = [] + for n in new_nodes: + print n + node_list.append((n['host'], int(n['port']))) + + master_count = len(node_list) + if replicas: + master_count /= (replicas + 1) + if master_count < 1: + print "ERROR: not enough fresh nodes to accomodate replication factor of {}".format(replicas) + return + + master_count = int(master_count) + master_list = node_list[:master_count] + slave_list = node_list[master_count:] + + if len(master_list) > 1: + print "INFO: creating cluster with the following nodes as masters: {}".format(master_list) + redistrib.command.start_cluster_on_multi(master_list) + else: + host = master_list[0][0] + port = master_list[0][1] + print "INFO: creating single master: {} {}".format(host, port) + m = StrictRedis(host=host, port=port) + s = "" + for i in xrange(1, NUM_SLOTS): + s += "{} ".format(i) + cmd = 'CLUSTER ADDSLOTS {}'.format(s) + print "INFO: sending following command: ", cmd + m.execute_command(cmd) + + + # add slaves + if replicas: + print "INFO: adding following nodes as slaves evenly across masters: {}".format(slave_list) + for i, s in enumerate(slave_list): + m = master_list[i % master_count] + redistrib.command.replicate(m[0], m[1], s[0], s[1]) + return True + + +def _map_cluster(node_host, node_port): + cluster = {} + slaves = [] + node_port = int(node_port) + nodes, master = redistrib.command.list_nodes(node_host, node_port) + for node in nodes: + if 'master' in node.role_in_cluster: + if node.node_id not in cluster: + cluster[node.node_id] = {'self': node, 'slaves': []} + else: + slaves.append(node) + + for slave in slaves: + cluster[slave.master_id]['slaves'].append(slave) + + cluster_replication_factor = int(len(cluster) / len(slaves)) + return cluster, slaves, cluster_replication_factor + + +def expand_cluster(master_host, master_port, new_nodes, num_new_masters=None): + """ + function to add a set of nodes to an existing cluster. NOTE: this function presumes that the list of new nodes are + NOT present on existing instances. Future versions MAY try to re-balance slaves among IP Addresses. + :param master_host: machine IP (string) of any healthy cluster node + :param master_port: machine Port of the master_host + :param new_nodes: list of {"host": "[ip_addr]", "port": "[port_num]"} objects + :param num_new_masters: if you want a specific ammount to be new masters, set this parameter Default: function will + attempt to auto-detect existing replication factor, and populate, accordingly. + :return: + """ + + cluster, slaves, cluster_replication_factor = _map_cluster(master_host, master_port) + + if not cluster: + print "ERROR: Empty Cluster for Host {} {}".format(master_host, master_port) + return + + num_sets_to_add = int(len(new_nodes)/cluster_replication_factor + 1) + if not num_sets_to_add: + print "ERROR: Cluster has a replication factor of {}. Insufficient number of new nodes given." + return + + new_replication_factor = cluster_replication_factor + if num_new_masters: + if num_new_masters < len(new_nodes): + print "ERROR: Insufficient number of new nodes ({}) to accomodate number of masters requested ({})".format( + len(new_nodes), num_new_masters + ) + return + num_sets_to_add = min(num_sets_to_add, num_new_masters) + new_replication_factor = min(cluster_replication_factor, (len(new_nodes)/(num_new_masters + 1))) + + master_list = None + + while num_sets_to_add: + if not master_list: + master_list = cluster.values() + num_sets_to_add -= 1 + new_node = new_nodes.pop() + master = master_list.pop() + master_node = master['self'] + slave_nodes = master['slaves'] + + redistrib.command.join_cluster(master_node.host, master_node.port, new_node['host'], int(new_node['port'])) + + for j in range(new_replication_factor): + new_slave_node = new_nodes.pop() + slave_node = slave_nodes.pop() + redistrib.command.replicate(master_node.host, master_node.port, new_slave_node['host'], int(new_slave_node['port'])) + if slave_node: + redistrib.command.replicate(new_node['host'], int(new_node['port']), slave_node.host, slave_node.port) + else: + print "WARN: slave node underrun for replication {} of factor {}".format(j+1, new_replication_factor) + + while new_nodes: + cluster, slaves, new_replication_factor = _map_cluster(master_host, master_port) + underweight = [] + for master in cluster.values(): + if len(master['slaves']) < new_replication_factor: + underweight.append(master) + + if underweight: + while underweight and new_nodes: + master = underweight.pop() + master_node = master['self'] + new_node = new_nodes.pop() + redistrib.command.replicate(master_node.host, master_node.port, new_node['host'], int(new_node['port'])) + else: + master_nodes = cluster.values() + while master_nodes and new_nodes: + new_node = new_nodes.pop() + master_node = master_nodes.pop() + redistrib.command.replicate(master_node.host, master_node.port, new_node['host'], int(new_node['port'])) + + +def main(): + # startup_nodes = [{"host": "10.64.1.102", "port": "7000"}] + startup_nodes = [{"host": "192.168.99.100", "port": "6380"}, {"host": "192.168.99.100", "port": "6381"}] + create(startup_nodes, replicas=1) + expand_cluster(startup_nodes[0]['host'], startup_nodes[0]['port'], {}) + + +if __name__ == "__main__": + main() + diff --git a/rediscluster/exceptions.py b/rediscluster/exceptions.py index 5019c98e..cb7cf7ad 100644 --- a/rediscluster/exceptions.py +++ b/rediscluster/exceptions.py @@ -11,6 +11,12 @@ class RedisClusterException(Exception): pass +class RedisClusterUninitialized(Exception): + """ + """ + message = "redis host not initiialized" + pass + class RedisClusterError(Exception): """ """ diff --git a/rediscluster/nodemanager.py b/rediscluster/nodemanager.py index 61b12ced..a4c2ccd6 100644 --- a/rediscluster/nodemanager.py +++ b/rediscluster/nodemanager.py @@ -5,7 +5,7 @@ # rediscluster imports from .crc import crc16 -from .exceptions import RedisClusterException +from .exceptions import RedisClusterException, RedisClusterUninitialized # 3rd party imports from redis import StrictRedis @@ -147,6 +147,11 @@ def initialize(self): except Exception: raise RedisClusterException("ERROR sending 'cluster slots' command to redis server: {0}".format(node)) + try: + assert len(cluster_slots) > 0 + except AssertionError: + raise RedisClusterUninitialized("ERROR 'cluster slots' command to redis server: {0} returns empty list [] ".format(node)) + all_slots_covered = True # If there's only one server in the cluster, its ``host`` is '' From 2549607fb2d30156625e3331fc811d50bf51b5e8 Mon Sep 17 00:00:00 2001 From: Chris Hoffman Date: Wed, 20 Apr 2016 13:00:15 -0700 Subject: [PATCH 2/3] update requirements --- rediscluster/cluster_setup.py | 93 +++++++++++++++++++++++------------ requirements.txt | 3 ++ 2 files changed, 64 insertions(+), 32 deletions(-) diff --git a/rediscluster/cluster_setup.py b/rediscluster/cluster_setup.py index 8965b498..8034c061 100644 --- a/rediscluster/cluster_setup.py +++ b/rediscluster/cluster_setup.py @@ -1,4 +1,7 @@ #!/usr/bin/python + +import logging + from rediscluster.exceptions import RedisClusterUninitialized from rediscluster import StrictRedisCluster from redis import StrictRedis @@ -45,25 +48,25 @@ def create(startup_nodes, replicas=0): new_nodes.append(node) if nodeset: - print 'nodes already in a cluster:' + logging.debug('nodes already in a cluster:') for n in nodeset: - print nodeset[n] + logging.debug( nodeset[n] ) if not new_nodes: - print "no nodes available to be in a cluster" + logging.debug( "no nodes available to be in a cluster" ) return - print 'nodes to make a new cluster' + logging.debug( 'nodes to make a new cluster' ) node_list = [] for n in new_nodes: - print n + logging.debug( n ) node_list.append((n['host'], int(n['port']))) master_count = len(node_list) if replicas: master_count /= (replicas + 1) if master_count < 1: - print "ERROR: not enough fresh nodes to accomodate replication factor of {}".format(replicas) + logging.debug( "ERROR: not enough fresh nodes to accomodate replication factor of {}".format(replicas) ) return master_count = int(master_count) @@ -71,24 +74,24 @@ def create(startup_nodes, replicas=0): slave_list = node_list[master_count:] if len(master_list) > 1: - print "INFO: creating cluster with the following nodes as masters: {}".format(master_list) + logging.debug( "INFO: creating cluster with the following nodes as masters: {}".format(master_list) ) redistrib.command.start_cluster_on_multi(master_list) else: host = master_list[0][0] port = master_list[0][1] - print "INFO: creating single master: {} {}".format(host, port) + logging.debug( "INFO: creating single master: {} {}".format(host, port) ) m = StrictRedis(host=host, port=port) s = "" for i in xrange(1, NUM_SLOTS): s += "{} ".format(i) cmd = 'CLUSTER ADDSLOTS {}'.format(s) - print "INFO: sending following command: ", cmd + logging.debug( "INFO: sending following command: {}".format(cmd) ) m.execute_command(cmd) # add slaves if replicas: - print "INFO: adding following nodes as slaves evenly across masters: {}".format(slave_list) + logging.debug( "INFO: adding following nodes as slaves evenly across masters: {}".format(slave_list) ) for i, s in enumerate(slave_list): m = master_list[i % master_count] redistrib.command.replicate(m[0], m[1], s[0], s[1]) @@ -110,7 +113,10 @@ def _map_cluster(node_host, node_port): for slave in slaves: cluster[slave.master_id]['slaves'].append(slave) - cluster_replication_factor = int(len(cluster) / len(slaves)) + if slaves: + cluster_replication_factor = int(len(cluster) / len(slaves)) + else: + cluster_replication_factor = 0 return cluster, slaves, cluster_replication_factor @@ -127,49 +133,62 @@ def expand_cluster(master_host, master_port, new_nodes, num_new_masters=None): """ cluster, slaves, cluster_replication_factor = _map_cluster(master_host, master_port) + logging.debug( "cluster: {}".format(cluster) ) + logging.debug( "slaves: {}".format(slaves) ) if not cluster: - print "ERROR: Empty Cluster for Host {} {}".format(master_host, master_port) + logging.debug( "ERROR: Empty Cluster for Host {} {}".format(master_host, master_port) ) return - num_sets_to_add = int(len(new_nodes)/cluster_replication_factor + 1) + num_sets_to_add = int(len(new_nodes)/(cluster_replication_factor + 1)) if not num_sets_to_add: - print "ERROR: Cluster has a replication factor of {}. Insufficient number of new nodes given." + logging.debug( "ERROR: Cluster has a replication factor of {}. Insufficient number of new nodes given." ) return new_replication_factor = cluster_replication_factor if num_new_masters: if num_new_masters < len(new_nodes): - print "ERROR: Insufficient number of new nodes ({}) to accomodate number of masters requested ({})".format( + logging.debug( "ERROR: Insufficient number of new nodes ({}) to accomodate number of masters requested ({})".format( len(new_nodes), num_new_masters - ) + ) ) return num_sets_to_add = min(num_sets_to_add, num_new_masters) new_replication_factor = min(cluster_replication_factor, (len(new_nodes)/(num_new_masters + 1))) + logging.debug( "crf: {}".format(new_replication_factor) ) + master_list = None while num_sets_to_add: if not master_list: master_list = cluster.values() + logging.debug( "master list: {}".format(master_list )) num_sets_to_add -= 1 - new_node = new_nodes.pop() + new_master_node = new_nodes.pop() master = master_list.pop() - master_node = master['self'] - slave_nodes = master['slaves'] + existing_master_node = master['self'] + existing_slave_nodes = master['slaves'] + logging.debug( 'adding new node {} to cluster as master, with {}.'.format(new_master_node, existing_master_node.port) ) - redistrib.command.join_cluster(master_node.host, master_node.port, new_node['host'], int(new_node['port'])) + redistrib.command.join_cluster(existing_master_node.host, existing_master_node.port, + new_master_node['host'], int(new_master_node['port'])) + logging.debug( 'master node joined to cluster!' ) for j in range(new_replication_factor): new_slave_node = new_nodes.pop() - slave_node = slave_nodes.pop() - redistrib.command.replicate(master_node.host, master_node.port, new_slave_node['host'], int(new_slave_node['port'])) - if slave_node: - redistrib.command.replicate(new_node['host'], int(new_node['port']), slave_node.host, slave_node.port) + existing_slave_node = existing_slave_nodes.pop() + logging.debug( 'adding node {} as slave to master {}'.format(new_slave_node['port'], existing_master_node.port) ) + redistrib.command.replicate(existing_master_node.host, existing_master_node.port, new_slave_node['host'], int(new_slave_node['port'])) + if existing_slave_node: + logging.debug( 'switching existing slave node {} to master {}'.format(existing_slave_node.port, new_master_node['port'] ) ) + + redistrib.command.replicate(new_master_node['host'], int(new_master_node['port']), existing_slave_node.host, existing_slave_node.port) else: - print "WARN: slave node underrun for replication {} of factor {}".format(j+1, new_replication_factor) + logging.debug( "WARN: slave node underrun for replication {} of factor {}".format(j+1, new_replication_factor) ) + logging.debug( "slave node {} added to the master".format(existing_slave_node) ) while new_nodes: + logging.debug( "there are new nodes left over, so adding them as slaves") cluster, slaves, new_replication_factor = _map_cluster(master_host, master_port) underweight = [] for master in cluster.values(): @@ -179,22 +198,32 @@ def expand_cluster(master_host, master_port, new_nodes, num_new_masters=None): if underweight: while underweight and new_nodes: master = underweight.pop() - master_node = master['self'] - new_node = new_nodes.pop() - redistrib.command.replicate(master_node.host, master_node.port, new_node['host'], int(new_node['port'])) + existing_master_node = master['self'] + new_master_node = new_nodes.pop() + redistrib.command.replicate(existing_master_node.host, existing_master_node.port, new_master_node['host'], int(new_master_node['port'])) else: master_nodes = cluster.values() while master_nodes and new_nodes: - new_node = new_nodes.pop() - master_node = master_nodes.pop() - redistrib.command.replicate(master_node.host, master_node.port, new_node['host'], int(new_node['port'])) + new_master_node = new_nodes.pop() + existing_master_node = master_nodes.pop() + redistrib.command.replicate(existing_master_node.host, existing_master_node.port, new_master_node['host'], int(new_master_node['port'])) def main(): + logger = logging.getLogger() + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + # startup_nodes = [{"host": "10.64.1.102", "port": "7000"}] startup_nodes = [{"host": "192.168.99.100", "port": "6380"}, {"host": "192.168.99.100", "port": "6381"}] create(startup_nodes, replicas=1) - expand_cluster(startup_nodes[0]['host'], startup_nodes[0]['port'], {}) + expand_cluster(startup_nodes[0]['host'], startup_nodes[0]['port'], [{'host': "192.168.99.100", "port":"6382"}, + {'host': "192.168.99.100", "port":"6383"}]) + if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index a4d27eb0..9a57f895 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ redis>=2.10.2 + +-e git+ssh://git@github.com:quixey/redis-trib.py.git@0.4.1#egg=redis-trib.py + From 338a6e8c63ab64637265d71cac4013ff6e46ac74 Mon Sep 17 00:00:00 2001 From: Chris Hoffman Date: Thu, 21 Apr 2016 15:00:12 -0700 Subject: [PATCH 3/3] fix runtime bugs, and get rid of need for rediscluster --- rediscluster/cluster_setup.py | 212 +++++++++++++++++++++++----------- requirements.txt | 1 + 2 files changed, 147 insertions(+), 66 deletions(-) diff --git a/rediscluster/cluster_setup.py b/rediscluster/cluster_setup.py index 8034c061..5e81f5f2 100644 --- a/rediscluster/cluster_setup.py +++ b/rediscluster/cluster_setup.py @@ -1,11 +1,13 @@ #!/usr/bin/python +import argparse import logging - -from rediscluster.exceptions import RedisClusterUninitialized -from rediscluster import StrictRedisCluster +from hiredis import ProtocolError, ReplyError from redis import StrictRedis + import redistrib.command +from redistrib.clusternode import Talker + NUM_SLOTS = 16384 @@ -30,43 +32,23 @@ def make_parts(n): return partitions -def create(startup_nodes, replicas=0): +def create(new_nodes, replicas=0): """ Function to create a new cluster ONLY from nodes not already initialized. NOTE: this function replicates redis-trib.rb 'create' command, EXCEPT that it can take less than 3 nodes for initialization. """ - nodeset = dict() - new_nodes = [] - for node in startup_nodes: - try: - rc = StrictRedisCluster(startup_nodes=[node], decode_responses=True) - cluster_nodes = rc.cluster_nodes() - for n in cluster_nodes: - nodeset.update({n['id']: n}) - except RedisClusterUninitialized: - new_nodes.append(node) - - if nodeset: - logging.debug('nodes already in a cluster:') - for n in nodeset: - logging.debug( nodeset[n] ) - - if not new_nodes: - logging.debug( "no nodes available to be in a cluster" ) - return - - logging.debug( 'nodes to make a new cluster' ) + logging.debug('nodes to make a new cluster') node_list = [] for n in new_nodes: - logging.debug( n ) + logging.debug("new node: {}".format(n)) node_list.append((n['host'], int(n['port']))) master_count = len(node_list) if replicas: master_count /= (replicas + 1) if master_count < 1: - logging.debug( "ERROR: not enough fresh nodes to accomodate replication factor of {}".format(replicas) ) + logging.error("Not enough fresh nodes to accommodate replication factor of {}".format(replicas)) return master_count = int(master_count) @@ -74,24 +56,22 @@ def create(startup_nodes, replicas=0): slave_list = node_list[master_count:] if len(master_list) > 1: - logging.debug( "INFO: creating cluster with the following nodes as masters: {}".format(master_list) ) + logging.info("Creating cluster with the following nodes as masters: {}".format(master_list)) redistrib.command.start_cluster_on_multi(master_list) else: host = master_list[0][0] port = master_list[0][1] - logging.debug( "INFO: creating single master: {} {}".format(host, port) ) + logging.info("Creating single master: {} {}".format(host, port)) m = StrictRedis(host=host, port=port) s = "" for i in xrange(1, NUM_SLOTS): s += "{} ".format(i) cmd = 'CLUSTER ADDSLOTS {}'.format(s) - logging.debug( "INFO: sending following command: {}".format(cmd) ) + logging.debug("Sending following command: {}".format(cmd)) m.execute_command(cmd) - - # add slaves if replicas: - logging.debug( "INFO: adding following nodes as slaves evenly across masters: {}".format(slave_list) ) + logging.info("Adding following nodes as slaves evenly across masters: {}".format(slave_list)) for i, s in enumerate(slave_list): m = master_list[i % master_count] redistrib.command.replicate(m[0], m[1], s[0], s[1]) @@ -133,63 +113,68 @@ def expand_cluster(master_host, master_port, new_nodes, num_new_masters=None): """ cluster, slaves, cluster_replication_factor = _map_cluster(master_host, master_port) - logging.debug( "cluster: {}".format(cluster) ) - logging.debug( "slaves: {}".format(slaves) ) + logging.debug("cluster: {}".format(cluster)) + logging.debug("slaves: {}".format(slaves)) if not cluster: - logging.debug( "ERROR: Empty Cluster for Host {} {}".format(master_host, master_port) ) + logging.error("Empty Cluster for Host {} {}".format(master_host, master_port)) return num_sets_to_add = int(len(new_nodes)/(cluster_replication_factor + 1)) if not num_sets_to_add: - logging.debug( "ERROR: Cluster has a replication factor of {}. Insufficient number of new nodes given." ) + logging.error("Cluster has a replication factor of {}. Insufficient number of new nodes given.") return new_replication_factor = cluster_replication_factor if num_new_masters: if num_new_masters < len(new_nodes): - logging.debug( "ERROR: Insufficient number of new nodes ({}) to accomodate number of masters requested ({})".format( - len(new_nodes), num_new_masters - ) ) + logging.error("Insufficient number of new nodes ({}) to accommodate number of " + "masters requested ({})".format(len(new_nodes), num_new_masters)) return num_sets_to_add = min(num_sets_to_add, num_new_masters) new_replication_factor = min(cluster_replication_factor, (len(new_nodes)/(num_new_masters + 1))) - logging.debug( "crf: {}".format(new_replication_factor) ) + logging.debug("crf: {}".format(new_replication_factor)) master_list = None + logging.info("Adding nodes {} to Cluster Instance {}:{}".format(new_nodes, master_host, master_port)) + while num_sets_to_add: if not master_list: master_list = cluster.values() - logging.debug( "master list: {}".format(master_list )) + logging.debug("master list: {}".format(master_list)) num_sets_to_add -= 1 new_master_node = new_nodes.pop() master = master_list.pop() existing_master_node = master['self'] existing_slave_nodes = master['slaves'] - logging.debug( 'adding new node {} to cluster as master, with {}.'.format(new_master_node, existing_master_node.port) ) + logging.info('Adding new node {} to cluster as a master.'.format(new_master_node)) redistrib.command.join_cluster(existing_master_node.host, existing_master_node.port, new_master_node['host'], int(new_master_node['port'])) - logging.debug( 'master node joined to cluster!' ) + logging.debug('master node joined to cluster!') for j in range(new_replication_factor): new_slave_node = new_nodes.pop() existing_slave_node = existing_slave_nodes.pop() - logging.debug( 'adding node {} as slave to master {}'.format(new_slave_node['port'], existing_master_node.port) ) - redistrib.command.replicate(existing_master_node.host, existing_master_node.port, new_slave_node['host'], int(new_slave_node['port'])) + logging.info('Adding new node {} as slave to master {}'.format(new_slave_node['port'], + existing_master_node.id)) + redistrib.command.replicate(existing_master_node.host, existing_master_node.port, new_slave_node['host'], + int(new_slave_node['port'])) if existing_slave_node: - logging.debug( 'switching existing slave node {} to master {}'.format(existing_slave_node.port, new_master_node['port'] ) ) + logging.info("Switching existing slave node {} to new master {}:{}".format(existing_slave_node.port, + new_master_node['host'], + new_master_node['port'])) - redistrib.command.replicate(new_master_node['host'], int(new_master_node['port']), existing_slave_node.host, existing_slave_node.port) + redistrib.command.replicate(new_master_node['host'], int(new_master_node['port']), + existing_slave_node.host, existing_slave_node.port) else: - logging.debug( "WARN: slave node underrun for replication {} of factor {}".format(j+1, new_replication_factor) ) - logging.debug( "slave node {} added to the master".format(existing_slave_node) ) + logging.warn("Slave node underrun for replication {} of factor {}".format(j+1, new_replication_factor)) while new_nodes: - logging.debug( "there are new nodes left over, so adding them as slaves") - cluster, slaves, new_replication_factor = _map_cluster(master_host, master_port) + logging.debug("{} new nodes left over, so adding them as slaves.".format(len(new_nodes))) + cluster, _, new_replication_factor = _map_cluster(master_host, master_port) underweight = [] for master in cluster.values(): if len(master['slaves']) < new_replication_factor: @@ -199,33 +184,128 @@ def expand_cluster(master_host, master_port, new_nodes, num_new_masters=None): while underweight and new_nodes: master = underweight.pop() existing_master_node = master['self'] - new_master_node = new_nodes.pop() - redistrib.command.replicate(existing_master_node.host, existing_master_node.port, new_master_node['host'], int(new_master_node['port'])) + new_slave_node = new_nodes.pop() + logging.info("Adding new node {}:{} as slave to underweight master {}".format( + new_slave_node['host'], new_slave_node['port'], existing_master_node.node_id + )) + redistrib.command.replicate(existing_master_node.host, existing_master_node.port, + new_slave_node['host'], int(new_slave_node['port'])) else: master_nodes = cluster.values() while master_nodes and new_nodes: new_master_node = new_nodes.pop() - existing_master_node = master_nodes.pop() - redistrib.command.replicate(existing_master_node.host, existing_master_node.port, new_master_node['host'], int(new_master_node['port'])) + master = master_nodes.pop() + existing_master_node = master['self'] + logging.info("Adding new node {}:{} as slave to master {}".format( + new_slave_node['host'], new_slave_node['port'], existing_master_node.node_id + )) + redistrib.command.replicate(existing_master_node.host, existing_master_node.port, + new_master_node['host'], int(new_master_node['port'])) + + +def validate_and_run(new_hosts, replication_factor, new_masters): + new_nodes = [] + cluster_nodes = [] + + for node in new_hosts: + logging.debug("removing any new_nodes are not joined to a cluster already...") + with Talker(node['host'], int(node['port'])) as t: + try: + redistrib.command._ensure_cluster_status_unset(t) + new_nodes.append(node) + except ProtocolError: + cluster_nodes.append(node) + + if cluster_nodes: + logging.debug('nodes already in a cluster:') + for n in cluster_nodes: + logging.debug(n) + + if not new_nodes: + logging.error("No new nodes available.") + return + + if not cluster_nodes: + create(new_nodes, replication_factor) + else: + master = cluster_nodes.pop() + expand_cluster(master['host'], master['port'], new_nodes) + + +def reset_node(node): + with Talker(node.host, node.port) as t: + t.talk('CLUSTER', 'RESET') + + +def remove_cluster(cluster_nodes): + for node in cluster_nodes: + logging.info('Clearing cluster from node {}:{}'.format(node['host'], node['port'])) + cluster, _, _ = _map_cluster(node['host'], node['port']) + for master in cluster.values(): + for slave in master['slaves']: + reset_node(slave) + reset_node(master['self']) + + +def decode_hosts(host_list): + hosts = [] + for host in host_list: + h = host.split(":") + hosts.append({'host': h[0], 'port': h[1]}) + return hosts def main(): logger = logging.getLogger() handler = logging.StreamHandler() - formatter = logging.Formatter( - '%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) - logger.setLevel(logging.DEBUG) - - # startup_nodes = [{"host": "10.64.1.102", "port": "7000"}] - startup_nodes = [{"host": "192.168.99.100", "port": "6380"}, {"host": "192.168.99.100", "port": "6381"}] - create(startup_nodes, replicas=1) - expand_cluster(startup_nodes[0]['host'], startup_nodes[0]['port'], [{'host': "192.168.99.100", "port":"6382"}, - {'host': "192.168.99.100", "port":"6383"}]) - + logger.setLevel(logging.INFO) + + parser = argparse.ArgumentParser() + parser.add_argument('-n', '--new-hosts', + type=str, + nargs="+", + action='store', + help="json dict object(s) of form [host_ip]:[port_number]", + required=True) + + mx = parser.add_mutually_exclusive_group(required=True) + mx.add_argument('-r', '--replication-factor', + type=int, + action='store', + help="required number of slaves per master; default=1") + mx.add_argument('-m', '--new-masters', + type=int, + action='store', + help="required number of new masters; default=0") + mx.add_argument('-c', '--clear-cluster', + action='store_true', + help="clear any cluster attached to any nodes listed") + + args = parser.parse_args() + + if not args.clear_cluster: + new_hosts = decode_hosts(args.new_hosts) + validate_and_run(new_hosts, args.replication_factor, args.new_masters) + else: + hosts = decode_hosts(args.clear_cluster) + remove_cluster(hosts) if __name__ == "__main__": main() +""" + rc = StrictRedisCluster(startup_nodes=[node], decode_responses=True) + cluster_nodes = rc.cluster_nodes() + for n in cluster_nodes: + nodeset.update({n['id']: n}) + + # startup_nodes = [{"host": "10.64.1.102", "port": "7000"}] + #startup_nodes = [{"host": "192.168.99.100", "port": "6380"}, {"host": "192.168.99.100", "port": "6381"}] + #create(startup_nodes, replicas=1) + #expand_cluster(startup_nodes[0]['host'], startup_nodes[0]['port'], [{'host': "192.168.99.100", "port":"6382"}, + # {'host': "192.168.99.100", "port":"6383"}]) +""" diff --git a/requirements.txt b/requirements.txt index 9a57f895..a1a9c3fd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ redis>=2.10.2 +hiredis -e git+ssh://git@github.com:quixey/redis-trib.py.git@0.4.1#egg=redis-trib.py