From 99173aafff265eb2b8857982237cfea8a7d94cf6 Mon Sep 17 00:00:00 2001 From: Shrinidhi-P-ML Date: Tue, 7 Feb 2023 21:12:25 +0530 Subject: [PATCH] Index addition and roll over due to age - Added index_addition.py to support index addition in simulation - Modified index roll over logic - Restructured config file to support index addition - Added index_addition in config class - Added timestamp in index class to track index creation time - Added index count in plotter.py - Restructured schema to support required validation --- simulator/src/app.py | 348 ++++++++++++++++-------- simulator/src/cluster.py | 259 ++++++++++++------ simulator/src/config.yaml | 9 +- simulator/src/config_parser.py | 67 ++--- simulator/src/index.py | 7 +- simulator/src/index_addittion.py | 83 ++++++ simulator/src/open_search_simulator.py | 356 +++++++++++++++++-------- simulator/src/plotter.py | 8 +- simulator/src/schema.py | 10 +- 9 files changed, 803 insertions(+), 344 deletions(-) create mode 100644 simulator/src/index_addittion.py diff --git a/simulator/src/app.py b/simulator/src/app.py index 9eb59ed..d432f85 100644 --- a/simulator/src/app.py +++ b/simulator/src/app.py @@ -25,6 +25,7 @@ shutil.rmtree("instance") db = SQLAlchemy(app) + # Database model to store the datapoints class DataModel(db.Model): status = db.Column(db.String(200)) @@ -47,30 +48,45 @@ class DataModel(db.Model): def get_provision_status(): """ - Returns the status of provision + Returns the status of provision. """ return is_provisioning def set_provision_status(provisioning): """ - Sets the provision status - :param provisioning: boolean value that indicates provision status + Sets the provision status. + :param provisioning: boolean value that indicates provision status. """ global is_provisioning is_provisioning = provisioning + def get_accelerate_flag(): + """ + Function fetches the accelerate flag value. + + :return : bool value that indicates status of accelerate flag. + """ return accelerate_flag + def set_accelerate_flag(is_accelerated): + """ + Function sets a flag which indicates the accelertion of time + taken for shard rebalancing. + + :param is_accelerated: Boolean value used to set the accelerate flag. + """ global accelerate_flag accelerate_flag = is_accelerated + def get_first_data_point_time(): """ Function queries the database for the time corresponding to first data point generated by the simulator + :return: Time corresponding to first data point generated by the simulator """ first_data_point_time = ( @@ -83,9 +99,10 @@ def get_first_data_point_time(): def cluster_db_object(cluster): """ - Create a DataModel instance that can be dumped into db - :param cluster: cluster object - :return: data model + Create a DataModel instance that can be dumped into db. + + :param cluster: cluster object. + :return: An object of class DataModel. """ return DataModel( cpu_usage_percent=cluster.cpu_usage_percent, @@ -106,85 +123,137 @@ def cluster_db_object(cluster): ) -def overwrite_after_node_count_change(cluster_objects,time=None): +def overwrite_after_node_count_change(cluster_objects, time=None): """ Calculate the resource utilization after node change operation and overwrite the saved data points in db after node change time. - Also create an overlap on the png file to show new data points - :param cluster_objects: all cluster objects with new node configuration - :param date_time: date time object to overwrite date time now - :return: expiry time + Also create an overlap on the png file to show new data points. + + :param cluster_objects: all cluster objects with new node configuration. + :param date_time: date time object to overwrite date time now. + :return: expiry time. """ if time == None: date_time = datetime.now() else: date_time = time + cluster_objects_post_change = [] for cluster_obj in cluster_objects: if cluster_obj.date_time >= date_time: cluster_objects_post_change.append(cluster_obj) task = cluster_db_object(cluster_obj) db.session.merge(task) + db.session.commit() plot_data_points( cluster_objects_post_change, skip_data_ingestion=True, skip_search_query=True ) return + @timeit -def reset_load(sim,time=None): +def reset_load(sim, time=None): """ The fuction resets shard size from end of simulation - to shard size configured at current time + to shard size configured at current time. + + :param sim: An object of class Simulator. + :param time: Optional time parameter to set shard configuration to specified time. """ sim.cluster.clear_index_size() - if time==None: + + if time == None: now = datetime.now() else: now = time - time_now_hour = now - timedelta(minutes=datetime.now().minute,seconds=now.second, - microseconds=now.microsecond) + current_disk = ( - DataModel.query.order_by(desc(DataModel.date_created)).filter(DataModel.date_created <= now) - .with_entities( - DataModel.__getattribute__(DataModel, constants.STAT_REQUEST['DiskUtil']) - ) - .first() + DataModel.query.order_by(desc(DataModel.date_created)) + .filter(DataModel.date_created <= now) + .with_entities( + DataModel.__getattribute__(DataModel, constants.STAT_REQUEST["DiskUtil"]) + ) + .first() ) + current_rolled_size = ( - DataModel.query.order_by(desc(DataModel.date_created)).filter(DataModel.date_created <= now) - .with_entities( - DataModel.__getattribute__(DataModel, 'rolled_index_size') - ) - .first() + DataModel.query.order_by(desc(DataModel.date_created)) + .filter(DataModel.date_created <= now) + .with_entities(DataModel.__getattribute__(DataModel, "rolled_index_size")) + .first() ) + rolled_index_size = current_rolled_size[0] - distribution_size = (current_disk[0]/100)*sim.cluster.total_disk_size_gb - distribution_size-=rolled_index_size - shard_size = distribution_size/sim.cluster.total_shard_count - distribution_size-= (sim.cluster.replica_shards_per_index * sim.cluster.primary_shards_per_index - * sim.cluster.index_count * (shard_size)) + distribution_size = (current_disk[0] / 100) * sim.cluster.total_disk_size_gb + distribution_size -= rolled_index_size + shard_size = distribution_size / sim.cluster.total_shard_count + distribution_size -= ( + sim.cluster.replica_shards_per_index + * sim.cluster.primary_shards_per_index + * sim.cluster.index_count + * (shard_size) + ) sim.cluster.rolled_index_size = rolled_index_size - sim.cluster.indices[sim.cluster.rolled_over_index_id].shards[0].shard_size = rolled_index_size + sim.cluster.indices[sim.cluster.rolled_over_index_id].shards[ + 0 + ].shard_size = rolled_index_size sim.cluster.indices[sim.cluster.rolled_over_index_id].index_size = rolled_index_size - sim.distribute_load((distribution_size/sim.frequency_minutes)*60) - - + sim.distribute_load((distribution_size / sim.frequency_minutes) * 60) + + +def reset_current_index_count(duration, sim, time): + """ + Resets the index configuration from end of simulation to current time. + + :param duration: Time (in minutes) to be re-simulated. + :param sim: An object of class Simulator. + :param time: Time corresponding to which index configuration will be reset. + """ + if time == None: + time = datetime.now() + + start_time_simulate = sim.total_simulation_minutes - duration + start_time_minute = int( + (start_time_simulate - (start_time_simulate % sim.frequency_minutes)) + / sim.frequency_minutes + ) + sim.current_index_count = sim.index_added_list[start_time_minute] + + for index in range(len(sim.cluster.indices[: sim.current_index_count])): + total_hours_elapsed = ( + (time - sim.cluster.indices[index].created_at).total_seconds() + ) // 3600 + sim.cluster.indices[index].time_elapsed_last_roll_over = time - timedelta( + hours=int(total_hours_elapsed % sim.cluster.index_roll_over_hours) + ) + def get_duration_for_resimulation(time): """ - Fetches duration in minutes to resimulate + Fetches duration in minutes to re-simulate. + + :param time: Time corresponding to which duration of re-simulation must be evaluated. + :return: Duration (in minutes) for resimulation. """ app.app_context().push() - if time==None: + + if time == None: time_now = datetime.now() else: time_now = time - simulation_end_date_time = DataModel.query.order_by(desc(DataModel.date_created)).with_entities( - DataModel.__getattribute__(DataModel, 'date_created') - ).first() - resimulation_time = math.ceil(((simulation_end_date_time[0] - time_now ).total_seconds())/60) - return int(resimulation_time)+5 + + simulation_end_date_time = ( + DataModel.query.order_by(desc(DataModel.date_created)) + .with_entities(DataModel.__getattribute__(DataModel, "date_created")) + .first() + ) + resimulation_time = math.ceil( + ((simulation_end_date_time[0] - time_now).total_seconds()) / 60 + ) + + return int(resimulation_time) + 5 + def get_simulated_points(): """ @@ -193,8 +262,11 @@ def get_simulated_points(): """ data_rate = sim.simulated_data_rates.copy() search_rate = sim.simulated_search_rates.copy() + index_added = sim.index_added_list.copy() total_minutes = sim.total_simulation_minutes - return data_rate,search_rate,total_minutes + + return data_rate, search_rate, total_minutes, index_added + @timeit def add_node_and_rebalance(nodes, time=None): @@ -202,71 +274,111 @@ def add_node_and_rebalance(nodes, time=None): Increments node count in cluster object and rebalances shards among available nodes. Re-Simulates data after node addition and shard rebalance. + :param nodes: count of node(s) to be added to cluster + :param time: Optional parameter which performs node addtion to corresponding time. """ app.app_context().push() - data_rate,search_rate,total_minutes = get_simulated_points() + + data_rate, search_rate, total_minutes, index_count = get_simulated_points() sim = Simulator( configs.cluster, configs.data_function, configs.search_description, configs.searches, configs.simulation_frequency_minutes, + configs.index_addition, ) sim.simulated_data_rates = data_rate sim.simulated_search_rates = search_rate sim.total_simulation_minutes = total_minutes + sim.index_added_list = index_count + duration = get_duration_for_resimulation(time) - if time==None: + + if time == None: hour = datetime.now().hour - minutes = str(datetime.now().minute) if datetime.now().minute > 9 else "0" + str(datetime.now().minute) + minutes = ( + str(datetime.now().minute) + if datetime.now().minute > 9 + else "0" + str(datetime.now().minute) + ) else: hour = time.hour - minutes = str(time.now().minute) if time.now().minute > 9 else "0" + str(time.now().minute) - reset_load(sim,time) + minutes = ( + str(time.now().minute) + if time.now().minute > 9 + else "0" + str(time.now().minute) + ) + + reset_current_index_count(duration, sim, time) + reset_load(sim, time) + is_accelerated = get_accelerate_flag() sim.cluster.add_nodes(nodes, is_accelerated) set_accelerate_flag(False) - cluster_objects = sim.run(duration, str(hour) + "_" + minutes + "_00",True,time) - overwrite_after_node_count_change(cluster_objects,time) + + cluster_objects = sim.run(duration, str(hour) + "_" + minutes + "_00", True, time) + overwrite_after_node_count_change(cluster_objects, time) + is_provisioning = get_provision_status() is_provisioning = False set_provision_status(is_provisioning) + @timeit -def rem_node_and_rebalance(nodes,time=None): +def rem_node_and_rebalance(nodes, time=None): """ Decrements node count in cluster object and rebalances the shards among the available nodes. Re-Simulates the data once the node is removed and shards are distributed - :param nodes: count of node(s) to be removed from cluster + + :param nodes: count of node(s) to be removed from cluster. + :param time: Optional parameter which performs node removal to corresponding time. """ app.app_context().push() - data_rate,search_rate,total_minutes = get_simulated_points() + + data_rate, search_rate, total_minutes, index_count = get_simulated_points() sim = Simulator( configs.cluster, configs.data_function, configs.search_description, configs.searches, configs.simulation_frequency_minutes, + configs.index_addition, ) sim.simulated_data_rates = data_rate sim.simulated_search_rates = search_rate sim.total_simulation_minutes = total_minutes + sim.index_added_list = index_count + duration = get_duration_for_resimulation(time) - if time==None: + + if time == None: hour = datetime.now().hour - minutes = str(datetime.now().minute) if datetime.now().minute > 9 else "0" + str(datetime.now().minute) + minutes = ( + str(datetime.now().minute) + if datetime.now().minute > 9 + else "0" + str(datetime.now().minute) + ) else: hour = time.hour - minutes = str(time.now().minute) if time.now().minute > 9 else "0" + str(time.now().minute) - reset_load(sim,time) + minutes = ( + str(time.now().minute) + if time.now().minute > 9 + else "0" + str(time.now().minute) + ) + + reset_current_index_count(duration, sim, time) + reset_load(sim, time) + is_accelerated = get_accelerate_flag() sim.cluster.remove_nodes(nodes, is_accelerated) set_accelerate_flag(False) - # sim.cluster.cluster_disk_size_used = sim.cluster.calculate_cluster_disk_size() - cluster_objects = sim.run(duration, str(hour) + "_" + minutes + "_00",True,time) - overwrite_after_node_count_change(cluster_objects,time) + + cluster_objects = sim.run(duration, str(hour) + "_" + minutes + "_00", True, time) + overwrite_after_node_count_change(cluster_objects, time) + is_provisioning = get_provision_status() is_provisioning = False set_provision_status(is_provisioning) @@ -284,23 +396,30 @@ def violated_count(): """ args = request.args args.to_dict() - metric = args.get('metric', type=str) - duration = args.get('duration', type=int) - threshold = args.get('threshold', type=float) - time_now_arg = args.get('time_now', type=str) - if metric == None or duration == None or threshold == None or len(args) > constants.QUERY_ARG_LENGTH_FOUR: + metric = args.get("metric", type=str) + duration = args.get("duration", type=int) + threshold = args.get("threshold", type=float) + time_now_arg = args.get("time_now", type=str) + + if ( + metric == None + or duration == None + or threshold == None + or len(args) > constants.QUERY_ARG_LENGTH_FOUR + ): return Response(json.dumps("Invalid query parameters"), status=400) if len(args) == constants.QUERY_ARG_LENGTH_FOUR and time_now_arg == None: return Response(json.dumps("Invalid query parameters"), status=400) - # calculate time to query for data - if time_now_arg: - try: + + if time_now_arg: + try: time_now = datetime.strptime(time_now_arg, constants.TIME_FORMAT) except: return Response(json.dumps("Invalid query parameters"), status=400) - else: + else: time_now = datetime.now() + # Convert the minutes to time object to compare and query for required data points query_begin_time = time_now - timedelta(minutes=duration) first_data_point_time = get_first_data_point_time() @@ -337,7 +456,7 @@ def violated_count(): return Response(e, status=404) -@app.route("/stats/avg/", methods=['GET']) +@app.route("/stats/avg/", methods=["GET"]) def average(): """ The endpoint evaluates average of requested stat for a duration @@ -349,22 +468,27 @@ def average(): """ args = request.args args.to_dict() - metric = args.get('metric',type=str) - duration = args.get('duration',type=int) - time_now_arg = args.get('time_now', type=str) - if metric == None or duration == None or len(args) > constants.QUERY_ARG_LENGTH_THREE: + metric = args.get("metric", type=str) + duration = args.get("duration", type=int) + time_now_arg = args.get("time_now", type=str) + + if ( + metric == None + or duration == None + or len(args) > constants.QUERY_ARG_LENGTH_THREE + ): return Response(json.dumps("Invalid query parameters"), status=400) if len(args) > constants.QUERY_ARG_LENGTH_TWO and time_now_arg == None: return Response(json.dumps("Invalid query parameters"), status=400) - # calculate time to query for data + # calculate time to query for data if time_now_arg: - try: + try: time_now = datetime.strptime(time_now_arg, constants.TIME_FORMAT) except: return Response(json.dumps("Invalid query parameters"), status=400) - else: + else: time_now = datetime.now() # Convert the minutes to time object to compare and query for required data points @@ -406,7 +530,7 @@ def average(): return Response(e, status=404) -@app.route("/stats/current", methods=['GET']) +@app.route("/stats/current", methods=["GET"]) def current_all(): """ The endpoint returns all the stats from the latest poll, @@ -414,39 +538,46 @@ def current_all(): """ args = request.args args.to_dict() - metric = args.get('metric', type=str) - time_now_arg = args.get('time_now', type=str) + metric = args.get("metric", type=str) + time_now_arg = args.get("time_now", type=str) if len(args) > constants.QUERY_ARG_LENGTH_TWO: return Response(json.dumps("Invalid query parameters"), status=400) - if len(args) == constants.QUERY_ARG_LENGTH_TWO and (time_now_arg == None or metric == None): + if len(args) == constants.QUERY_ARG_LENGTH_TWO and ( + time_now_arg == None or metric == None + ): return Response(json.dumps("Invalid query parameters"), status=400) - - if len(args) == constants.QUERY_ARG_LENGTH_ONE and (time_now_arg == None and metric == None): + + if len(args) == constants.QUERY_ARG_LENGTH_ONE and ( + time_now_arg == None and metric == None + ): return Response(json.dumps("Invalid query parameters"), status=400) if time_now_arg: - try: + try: time_now = datetime.strptime(time_now_arg, constants.TIME_FORMAT) except: return Response(json.dumps("Invalid query parameters"), status=400) - else: + else: time_now = datetime.now() - if metric!= None: + if metric != None: try: if constants.STAT_REQUEST[metric] == constants.CLUSTER_STATE: # is_provisioning = get_provision_status() if get_provision_status(): - # if Simulator.is_provision_in_progress(): + # if Simulator.is_provision_in_progress(): return jsonify({"current": constants.CLUSTER_STATE_YELLOW}) # Fetches the stat_name for the latest poll current_stat = ( DataModel.query.order_by(desc(DataModel.date_created)) .with_entities( - DataModel.__getattribute__(DataModel, constants.STAT_REQUEST[metric]) - ).filter(DataModel.date_created <= time_now) + DataModel.__getattribute__( + DataModel, constants.STAT_REQUEST[metric] + ) + ) + .filter(DataModel.date_created <= time_now) .all() ) @@ -473,7 +604,8 @@ def current_all(): DataModel.__getattribute__( DataModel, constants.STAT_REQUEST_CURRENT[key] ) - ).filter(DataModel.date_created <= time_now) + ) + .filter(DataModel.date_created <= time_now) .all() ) stat_dict[key] = value[0][0] @@ -508,13 +640,15 @@ def add_node(): is_provisioning = False set_provision_status(is_provisioning) return Response(json.dumps("expected key 'nodes'"), status=404) - try: - time_now_arg = request.json['time_now'] - time_now = datetime.strptime(time_now_arg, constants.TIME_FORMAT) - set_accelerate_flag(True) + + try: + time_now_arg = request.json["time_now"] + time_now = datetime.strptime(time_now_arg, constants.TIME_FORMAT) + set_accelerate_flag(True) except: - time_now = None - add_node_thread = Thread(target=add_node_and_rebalance, args=(nodes,time_now)) + time_now = None + + add_node_thread = Thread(target=add_node_and_rebalance, args=(nodes, time_now)) add_node_thread.start() return jsonify({"nodes": node_count}) @@ -552,13 +686,15 @@ def remove_node(): is_provisioning = False set_provision_status(is_provisioning) return Response(json.dumps("expected key 'nodes'"), status=404) - try: - time_now_arg = request.json['time_now'] - time_now = datetime.strptime(time_now_arg, constants.TIME_FORMAT) - set_accelerate_flag(True) + + try: + time_now_arg = request.json["time_now"] + time_now = datetime.strptime(time_now_arg, constants.TIME_FORMAT) + set_accelerate_flag(True) except: - time_now = None - rem_node_thread = Thread(target=rem_node_and_rebalance, args=(nodes,time_now)) + time_now = None + + rem_node_thread = Thread(target=rem_node_and_rebalance, args=(nodes, time_now)) rem_node_thread.start() return jsonify({"nodes": node_count}) @@ -578,13 +714,14 @@ def all_data(): if __name__ == "__main__": db.create_all() cluster_dynamic = ClusterDynamic() - # remove any existing provision lock is_provisioning = False accelerate_flag = False + # get configs from config yaml configs = parse_config( os.path.join(get_source_code_dir(), constants.CONFIG_FILE_PATH) ) + # create the simulator object sim = Simulator( configs.cluster, @@ -592,13 +729,16 @@ def all_data(): configs.search_description, configs.searches, configs.simulation_frequency_minutes, + configs.index_addition, ) sim.cluster.cluster_dynamic = cluster_dynamic - # generate the data points from simulator + days = len(sim.data_ingestion.states) cluster_objects = sim.run(24 * 60 * days) + # save the generated data points to png plot_data_points(cluster_objects) + # save data points inside db for cluster_obj in cluster_objects: task = cluster_db_object(cluster_obj) diff --git a/simulator/src/cluster.py b/simulator/src/cluster.py index 58b484c..d82bc49 100644 --- a/simulator/src/cluster.py +++ b/simulator/src/cluster.py @@ -1,9 +1,10 @@ -from datetime import datetime +from datetime import datetime, timedelta +import time +import random + from node import Node from index import Index from cluster_dynamic import ClusterDynamic -import random -import time import constants @@ -25,7 +26,8 @@ def __init__( master_eligible_nodes_count: int, index_count: int, index_roll_over_size_gb: int, - index_clean_up_age_days: int, + # index_clean_up_age_days: int, + index_roll_over_hours: int, primary_shards_per_index: int, replica_shards_per_index: int, min_nodes_in_cluster: int, @@ -86,13 +88,15 @@ def __init__( self.active_data_nodes = active_data_nodes self.master_eligible_nodes_count = master_eligible_nodes_count self.index_count = index_count + self.instantaneous_index_count = 0 self.index_roll_over_size_gb = index_roll_over_size_gb - self.index_clean_up_age_in_minutes = index_clean_up_age_days + # self.index_clean_up_age_in_minutes = index_clean_up_age_days + self.index_roll_over_hours = index_roll_over_hours self.primary_shards_per_index = primary_shards_per_index self.replica_shards_per_index = replica_shards_per_index - self.total_shard_count = (primary_shards_per_index - * (replica_shards_per_index + 1) - ) * index_count + self.total_shard_count = ( + primary_shards_per_index * (replica_shards_per_index + 1) + ) * index_count self.rolled_over_index_id = -1 self.cluster_dynamic = cluster_dynamic self.min_nodes_in_cluster = min_nodes_in_cluster @@ -118,16 +122,26 @@ def __init__( replica_shards_per_index, ) self.indices = self.initialize_indices( - index_count, primary_shards_per_index, replica_shards_per_index + index_count, + primary_shards_per_index, + replica_shards_per_index, + datetime.now() + - timedelta( + hours=datetime.now().hour, + minutes=datetime.now().minute, + seconds=datetime.now().second, + microseconds=datetime.now().microsecond, + ), ) self.allocate_shards_to_node() - # TODO: Define methods for controlling cluster behaviour, - # node addition, removal etc def add_nodes(self, nodes, accelerate=False): """ - Adds node to cluster and performs shards rebalancing. - Cluster state will be Yellow till rebalancing is complete. + Adds node(s) to cluster and performs shards rebalancing. + Populates cluster dynamic object to respond current state of cluster. + + :param nodes: count of node(s) to be added to cluster + :param accelerate: optional parameter to accelerate time for shard rebalance(default False) """ # Update the total node count in cluster dynamic self.cluster_dynamic.ClusterStatus = constants.CLUSTER_STATE_YELLOW @@ -140,31 +154,44 @@ def add_nodes(self, nodes, accelerate=False): self.cluster_dynamic.NumActiveShards = ( self.primary_shards_per_index * (self.replica_shards_per_index + 1) ) * self.index_count - self.cluster_dynamic.NumRelocatingShards = (( - self.primary_shards_per_index * (self.replica_shards_per_index + 1) - ) * self.index_count) // self.total_nodes_count # Add the node for node in range(nodes): + self.cluster_dynamic.NumRelocatingShards = ( + (self.primary_shards_per_index * (self.replica_shards_per_index + 1)) + * self.index_count) // self.total_nodes_count + new_node = Node(0, 0, 0, len(self.nodes)) existing_node_id = self.get_available_node_id() self.nodes.append(new_node) - rebalancing_size = self.cluster_disk_size_used / (self.total_nodes_count + nodes) - self.total_disk_size_gb+= (self.total_disk_size_gb/self.total_nodes_count) - rebalance_time = self.time_function_for_rebalancing(rebalancing_size,accelerate) - self.rebalance_shards(rebalance_time,existing_node_id, len(existing_node_id)) + + rebalancing_size = self.cluster_disk_size_used / ( + self.total_nodes_count + nodes + ) + self.total_disk_size_gb += self.total_disk_size_gb / self.total_nodes_count + rebalance_time = self.time_function_for_rebalancing( + rebalancing_size, accelerate + ) + self.rebalance_shards( + rebalance_time, existing_node_id, len(existing_node_id) + ) self.total_nodes_count += 1 - self.cluster_dynamic.NumRelocatingShards = 0 + + if (len(self.unassigned_shards_list) > 0 + and self.total_nodes_count >= self.replica_shards_per_index + 1 + ): + unassigned_shard_size = self.get_unassigned_shard_size() + self.rebalance_unassigned_shards(unassigned_shard_size, accelerate) + self.unassigned_shards_list.clear() + + self.cluster_dynamic.NumRelocatingShards = 0 + self.status = constants.CLUSTER_STATE_GREEN - self.active_data_nodes+=nodes - self.master_eligible_nodes_count+=nodes + self.active_data_nodes += nodes + self.master_eligible_nodes_count += nodes self.cluster_dynamic.ClusterStatus = constants.CLUSTER_STATE_GREEN return - # Perform rebalancing - self.status = constants.CLUSTER_STATE_YELLOW - # Todo - simulate effect on shards - def remove_nodes(self, nodes, accelerate): """ Removes node from cluster, rebalances unassigned shards due to @@ -172,7 +199,9 @@ def remove_nodes(self, nodes, accelerate): unassigned shards, the cluster will be in yellow state. The cluster will be in Yellow state when there is shard movement to allocate unassigned shards to nodes. - :param nodes: count of nodes to be added to cluster + + :param nodes: count of nodes to be added to cluster. + :param accelerate: optional parameter to accelerate time for shard rebalance(default False). """ if self.min_nodes_in_cluster > self.total_nodes_count: print("Cannot remove more nodes, minimum nodes required") @@ -182,6 +211,7 @@ def remove_nodes(self, nodes, accelerate): self.cluster_dynamic.NumNodes = self.total_nodes_count - nodes self.cluster_dynamic.NumMasterNodes = self.master_eligible_nodes_count - nodes self.cluster_dynamic.NumActiveDataNodes = self.active_data_nodes - nodes + # Choose a node from cluster and remove it for node in range(nodes): node_id = random.randint(0, len(self.nodes) - 1) @@ -210,8 +240,8 @@ def remove_nodes(self, nodes, accelerate): self.unassigned_shards_list.append(shard) del self.nodes[node_id] - self.total_disk_size_gb-= (self.total_disk_size_gb/self.total_nodes_count) - self.total_nodes_count-= 1 + self.total_disk_size_gb -= self.total_disk_size_gb / self.total_nodes_count + self.total_nodes_count -= 1 self.update_node_id() # If sufficient nodes are present @@ -222,96 +252,112 @@ def remove_nodes(self, nodes, accelerate): self.cluster_dynamic.NumUnassignedShards = 0 self.cluster_dynamic.NumRelocatingShards = 0 self.status = constants.CLUSTER_STATE_GREEN - self.active_data_nodes-=nodes - self.master_eligible_nodes_count-=nodes + self.active_data_nodes -= nodes + self.master_eligible_nodes_count -= nodes self.cluster_dynamic.ClusterStatus = constants.CLUSTER_STATE_GREEN return # If sufficient nodes not present, set cluster state yellow self.status = constants.CLUSTER_STATE_YELLOW self.cluster_dynamic.ClusterStatus = "Yellow" - # Todo - simulate effect on shards def initialize_nodes( self, total_nodes_count, index_count, primary_shards_count, replica_shards_count ): """ - Function takes the count of nodes in the cluster and creates a - list of node objects. Each node object will have arbitrary count - of indexes and each index will have the primary and replica shards - as per the parameter. + Function takes the count of nodes in cluster and creates list + of node objects. Each node object will have arbitrary count of + indices and each index will have the primary and replica shards + as per the configuration file. :return nodes: A list of node objects """ nodes = [] - for i in range(total_nodes_count): - # To-do: Add mechanism to distribute the index count randomnly across nodes node = Node(index_count, primary_shards_count, replica_shards_count, i) nodes.append(node) return nodes - def get_node_id(self): + def update_node_id(self): """ - Function fetches the node id and returns a list of node id's - in a cluster object + Updates the node Id after removal of node object from cluster. """ - node_id = [] - - for node in self.nodes: - node_id.append(node.node_id) - - return node_id - - def update_node_id(self): for node_id in range(len(self.nodes)): self.nodes[node_id].node_id = node_id def get_available_node_id(self): + """ + Returns node Id of avalilable node objects in cluster. + :return: List of node Id. + """ node_id = [] - for node in self.nodes: if node.node_available: node_id.append(node.node_id) return node_id - def initialize_indices(self, index_count, primary_count, replica_count): + def initialize_indices(self, index_count, primary_count, replica_count, time): """ The function will create index objects of the specified count Each index will have primary and replica shards of specified - count - :return index: A list of index objects + count. + + :param primary_count: Count of primary shards required for index. + :param replica_count: Count of replica shards required for index. + :param time: Specifies the timestamp of index creation. + :return index: A list of index objects. """ indices = [] - - for i in range(index_count): - index = Index(primary_count, replica_count, i) + for index_id in range(index_count): + index = Index(primary_count, replica_count, index_id, time) indices.append(index) - return indices - def create_index(self, primary_count, replica_count): + def create_index(self, primary_count, replica_count, time): """ Creates an index object with specified number - of primary and replica shards + of primary and replica shards and adds it to cluster. + + :param primary_count: Count of primary shards required for index. + :param replica_count: Count of replica shards required for index. + :param time: Specifies the timestamp of index creation. """ - index = Index(primary_count, replica_count, len(self.indices)) + index = Index(primary_count, replica_count, len(self.indices), time) self.indices.append(index) def clear_index_size(self): + """ + Resets the shards size of the indices + """ for index in self.indices: index.index_size = 0 for shard in index.shards: shard.shard_size = 0 - def allocate_shards_to_node(self): + def allocate_shards_to_node(self, indices=None): """ Allocates shards arbitrarily to nodes, - This creates shards to node mapping + This creates shards to node mapping. + + :param indices: Optional parameter that allocates + shards to nodes for specified indices (default None). """ node_id = self.get_available_node_id() + if indices != None: + for index in indices: + for shard in index.shards: + id = random.choice(node_id) + + while not self.nodes[id].node_available: + id = random.randint(0, len(self.nodes)) + + shard.node_id = id + shard.state = "started" + self.nodes[id].shards_on_node.append(shard) + return + for node in self.nodes: node.shards_on_node.clear() @@ -326,29 +372,55 @@ def allocate_shards_to_node(self): shard.state = "started" self.nodes[id].shards_on_node.append(shard) - def calculate_cluster_disk_size(self): + def calculate_cluster_disk_size(self, index_count=None): """ Evaluates the disk space occupied in the cluster Returns the total size used in GB for the cluster - object + object. + :param index_count: optional parameter that evaluates disk space + for mentioned indices. + :return: Total size of cluster (in GB). """ - # To-Do: Total size must be taken from initial size of the cluster before ingestion + if index_count != None: + total_size = 0 + for index in self.indices[:index_count]: + for shard in index.shards: + total_size += shard.shard_size + + return total_size + total_size = 0 + for index in self.indices: + for shard in index.shards: + total_size += shard.shard_size - for node in self.nodes: - total_size += node.calculate_total_node_size() return total_size def get_unassigned_shard_size(self): - size = 0 + """ + Evaluates the size of unassigned shards. + :return: Size(in GB) of unassigned shards. + """ + unassigned_shard_size = 0 + for unassigned_shard in self.unassigned_shards_list: + if unassigned_shard.type == "Replica": + unassigned_shard_size += unassigned_shard.shard_size - for shard in self.unassigned_shards_list: - if shard.type == "Replica": - size += shard.shard_size + return unassigned_shard_size + + def get_shards_to_rebalanace_count(self): + """ + Returns count of assigned shards in cluster. + :return: Assigned shards count. + """ + available_node_id = self.get_available_node_id() + total_shards_available = 0 + for node in available_node_id: + total_shards_available += len(self.nodes[node].shards_on_node) - return size + return total_shards_available - def time_function_for_rebalancing(self, unassigned_shard_size,accelerate): + def time_function_for_rebalancing(self, unassigned_shard_size, accelerate): """ Simulates the time taken for rebalancing the shard The time evaluation is based on the size of unassigned shards. @@ -358,24 +430,36 @@ def time_function_for_rebalancing(self, unassigned_shard_size,accelerate): With initial assumption of 1Gb data size takes 5 minutes to rebalance, it takes 1/12 seconds per GB of data The time to rebalance is evaluated for unassigned shards size. + + :param unassigned_shard_size: Size (in GB) of unassigned shards. + :param accelerate: Boolean flag to indicate time acceleration for rebalancing shards. + :return: Time (in seconds) for rebalancing the shards. """ if accelerate: return random.randint(0, 5) + rebalancing_time = unassigned_shard_size * (1 / 12) return rebalancing_time def rebalance_shards(self, rebalance_time, existing_node_id_list, new_node_id): """ - The function simulates the rebalancing of shards when a new - node is added. It takes the rebalance time and simulates the - time elapsed for the rebalance of shards + The function simulates the rebalancing of shards when a new + node is added. It takes the rebalance time and simulates the + time elapsed for rebalance of shards. + + :param rebalance_time: Time elapsed for rebalancing the shards. + :param existing_node_id_list: List of node Id's available in cluster. + :param new_node_id: Node id of newly added node in cluster. """ - total_rebalance_shard_count =(( - self.primary_shards_per_index * (self.replica_shards_per_index + 1) - ) * self.index_count) // self.total_nodes_count - + total_rebalance_shard_count = self.get_shards_to_rebalanace_count() + for node_id in existing_node_id_list: - for shard in range(total_rebalance_shard_count//self.total_nodes_count): + for shard in range(total_rebalance_shard_count // self.total_nodes_count): + if ( + len(self.nodes[node_id].shards_on_node) + <= total_rebalance_shard_count // self.total_nodes_count + ): + break rebalancing_shard = self.nodes[node_id].shards_on_node.pop() rebalancing_shard.node_id = new_node_id self.nodes[new_node_id].shards_on_node.append(rebalancing_shard) @@ -383,18 +467,19 @@ def rebalance_shards(self, rebalance_time, existing_node_id_list, new_node_id): sleep_time = random.uniform(0, rebalance_time) rebalance_time -= sleep_time time.sleep(sleep_time) - self.cluster_dynamic.NumRelocatingShards-=1 - + self.cluster_dynamic.NumRelocatingShards -= 1 def rebalance_unassigned_shards(self, unassigned_shard_size, accelerate): """ Rebalances unassigned shards among available nodes. The time taken for shard rebalancing is simulated using time function - :param unassigned_shard_size: size of unassigned shards + :param unassigned_shard_size: Size of unassigned shards + :param accelerate: Boolean flag to indicate time acceleration for rebalancing shards. """ - # Add time function to simulate the time taken for rebalancing - rebalance_time = self.time_function_for_rebalancing(unassigned_shard_size, accelerate) + rebalance_time = self.time_function_for_rebalancing( + unassigned_shard_size, accelerate + ) # Assign the shards to the available nodes on the cluster for shard in self.unassigned_shards_list: diff --git a/simulator/src/config.yaml b/simulator/src/config.yaml index bb4b177..c337133 100644 --- a/simulator/src/config.yaml +++ b/simulator/src/config.yaml @@ -12,7 +12,8 @@ index_count: 100 primary_shards_per_index: 2 replica_shards_per_index: 1 index_roll_over_size_gb: 10 -index_clean_up_age_days: 20 +# index_clean_up_age_days: 20 +index_roll_over_hours: 12 total_disk_size_gb: 14000 simulation_frequency_minutes: 5 @@ -79,6 +80,8 @@ states: simple: 110000 medium: 80000 complex: 55000 + index: + count: 10 - position: 10 time_hh_mm_ss: '13_00_00' ingestion_rate_gb_per_hr: 20 @@ -199,6 +202,8 @@ states: simple: 100000 medium: 80000 complex: 50000 + index: + count: 10 - position: 8 time_hh_mm_ss: '11_00_00' ingestion_rate_gb_per_hr: 24 @@ -306,6 +311,8 @@ states: searches: simple: 50000 medium: 2000 + index: + count: 10 - position: 4 time_hh_mm_ss: '06_00_00' ingestion_rate_gb_per_hr: 3 diff --git a/simulator/src/config_parser.py b/simulator/src/config_parser.py index 8196d76..caedc97 100644 --- a/simulator/src/config_parser.py +++ b/simulator/src/config_parser.py @@ -12,6 +12,7 @@ from search import Search from search import SearchDescription from errors import ValidationError +from index_addittion import IndexAddition class Config: @@ -20,12 +21,12 @@ class Config: """ def __init__( - self, - stats: dict, - states: list[dict], - search_description: dict[dict], - simulation_frequency_minutes: int, - randomness_percentage: int, + self, + stats: dict, + states: list[dict], + search_description: dict[dict], + simulation_frequency_minutes: int, + randomness_percentage: int, ): """ Initialise the Config object @@ -37,41 +38,41 @@ def __init__( """ self.cluster = Cluster(**stats) self.simulation_frequency_minutes = simulation_frequency_minutes - # all_states = [ - # State(position=state["position"], - # time_hh_mm_ss=state["time_hh_mm_ss"], - # ingestion_rate_gb_per_hr=state["ingestion_rate_gb_per_hr"]) - # for state in states - # ] all_states = [] - day_state=[] + day_state = [] for state in states: for position in state["pattern"]: - day_state.append(State(position=position['position'], + day_state.append( + State( + position=position["position"], time_hh_mm_ss=position["time_hh_mm_ss"], - ingestion_rate_gb_per_hr=position["ingestion_rate_gb_per_hr"])) + ingestion_rate_gb_per_hr=position["ingestion_rate_gb_per_hr"], + ) + ) all_states.append(copy.deepcopy(day_state)) day_state.clear() self.data_function = DataIngestion(all_states, randomness_percentage) - self.search_description = {search_type: - SearchDescription(search_stat=SearchStat(**specs), search_type=search_type) - for search_type, specs in search_description.items() - } - # self.searches = Search([ - # SearchState(position=state["position"], - # time_hh_mm_ss=state["time_hh_mm_ss"], - # searches=state["searches"]) - # for state in states - # ]) - search=[] + self.search_description = { + search_type: SearchDescription( + search_stat=SearchStat(**specs), search_type=search_type + ) + for search_type, specs in search_description.items() + } + search = [] for state in states: - searches_day =[] + searches_day = [] for position in state["pattern"]: - searches_day.append(SearchState(position=position['position'], - time_hh_mm_ss=position["time_hh_mm_ss"], - searches=position['searches'])) - search.append(searches_day) + searches_day.append( + SearchState( + position=position["position"], + time_hh_mm_ss=position["time_hh_mm_ss"], + searches=position["searches"], + ) + ) + search.append(searches_day) self.searches = Search(search) + self.index_addition = IndexAddition(states) + def get_source_code_dir(): """ @@ -80,6 +81,7 @@ def get_source_code_dir(): """ return Path(__file__).parent.resolve() + def validate_config(all_configs: dict): """ Validate dictionary of configs (read from config file) against the defined schema @@ -97,6 +99,7 @@ def validate_config(all_configs: dict): validator = Validator(schema) return validator.validate(all_configs, schema), validator.errors + def parse_config(config_file_path: str): """ Read and parse the config file into objects, @@ -119,7 +122,7 @@ def parse_config(config_file_path: str): # Perform Validation of the config file is_valid, errors = validate_config(all_configs) - + if not is_valid: raise ValidationError("Error validating config file - " + str(errors)) diff --git a/simulator/src/index.py b/simulator/src/index.py index 0d8a5dc..958394b 100644 --- a/simulator/src/index.py +++ b/simulator/src/index.py @@ -1,5 +1,5 @@ from shard import Shard - +import datetime class Index: """ @@ -12,7 +12,8 @@ def __init__( self, primary_shards_count: int, replica_shards_count: int, - index_id: int + index_id: int, + time: datetime.datetime ): """ Initialize the index object @@ -25,6 +26,8 @@ def __init__( self.index_id = index_id self.rolled_over = False self.index_size = 0 + self.created_at = time + self.time_elapsed_last_roll_over = time self.shards = self.initialize_shards(primary_shards_count, replica_shards_count) diff --git a/simulator/src/index_addittion.py b/simulator/src/index_addittion.py new file mode 100644 index 0000000..524976e --- /dev/null +++ b/simulator/src/index_addittion.py @@ -0,0 +1,83 @@ +from scipy.interpolate import InterpolatedUnivariateSpline +import numpy as np +import math + +class Index: + """ + Represents and holds the data fetched from the configuration for index addition + """ + def __init__(self, + index_count:int, + primary_count:int, + replica_count:int): + self.index_count = index_count + self.primary_count = primary_count + self.replica_count = replica_count + +class IndexAddition: + """ + Parses the configuration and creates relevant index objects, + Performs aggregation of index addition. + """ + def __init__(self, + states: list[dict] + ): + self.states = states + + def aggregate_index_addition(self,initial_index_count, start_time_hh_mm_ss: str, frequency_minutes:int): + """ + Produces cumulative index count over time period and returns a list of aggregated index count + for given duration. + + :param start_time_hh_mm_ss: start time in hh_mm_ss in 24-hour format. + :param duration_minutes: duration of point generation in minutes + :param frequency_minutes: gap between the resultant points + :return: array of int containing resultant index aggregation points + """ + time_of_day = [] + total_index_count = initial_index_count + index_addition = [] + start_time_hour = int(start_time_hh_mm_ss.split("_")[0]) + start_time_minutes = int(start_time_hh_mm_ss.split("_")[1]) + if start_time_minutes > 0: + start_time_hour+=1 + duration_of_day = ((24 - start_time_hour)*60)+ ((60 - start_time_minutes)%60) + for day in self.states: + time_of_day.clear() + index_added = [] + index_count_list = [] + for position in day['pattern']: + if int(position['time_hh_mm_ss'].split("_")[0]) >= int(start_time_hh_mm_ss.split("_")[0]): + time_of_day.append( + (int(position['time_hh_mm_ss'].split("_")[0]) - int("0")) * 60 + ) + + index_addition_rate = position.get('index',0) + if index_addition_rate == 0: + index_added.append(Index(0,0,0)) + index_count_list.append(total_index_count) + else: + index_added.append(Index(index_addition_rate.get('count'), + index_addition_rate.get('primary'), + index_addition_rate.get('replica') + )) + total_index_count+= index_addition_rate.get('count') + index_count_list.append(total_index_count) + + intervals = int(duration_of_day/frequency_minutes) + if start_time_hh_mm_ss == "00_00_00": + x = np.linspace(0, 24*60, intervals) + else: + start = int(start_time_hh_mm_ss.split("_")[0]) + x = np.linspace(start, duration_of_day, intervals) + + order = 1 + s = InterpolatedUnivariateSpline( + time_of_day, index_count_list, k=order + ) + + y = [min(int(math.ceil(max(i, initial_index_count))), total_index_count) for i in s(x)] + for val in y: + index_addition.append(val) + + return index_addition diff --git a/simulator/src/open_search_simulator.py b/simulator/src/open_search_simulator.py index ea2518a..1dee1d5 100644 --- a/simulator/src/open_search_simulator.py +++ b/simulator/src/open_search_simulator.py @@ -8,6 +8,7 @@ from cluster import Cluster from data_ingestion import DataIngestion from search import SearchDescription, Search +from index_addittion import IndexAddition import time @@ -19,8 +20,10 @@ def inner(*args, **kwargs): total_time = time_end - time_start print("time taken for the function :", func.__name__, " is: ", total_time) return ret + return inner + class Simulator: """ Runs simulation on a passed cluster object @@ -28,14 +31,16 @@ class Simulator: - triggering of events - altering the states of nodes and cluster based on events """ + def __init__( - self, - cluster: Cluster, - data_ingestion: DataIngestion, - search_description: SearchDescription, - searches: Search, - frequency_minutes: int, - elapsed_time_minutes: int = 0, + self, + cluster: Cluster, + data_ingestion: DataIngestion, + search_description: SearchDescription, + searches: Search, + frequency_minutes: int, + index_addition: IndexAddition, + elapsed_time_minutes: int = 0, ): """ Initialize the Simulator object @@ -52,111 +57,170 @@ def __init__( self.elapsed_time_minutes = elapsed_time_minutes self.frequency_minutes = frequency_minutes self.searches = searches + self.index_addition = index_addition self.simulated_data_rates = [] self.simulated_search_rates = {} self.total_simulation_minutes = 0 + self.index_added_list = [] + self.current_index_count = 0 - def aggregate_data( - self, - duration_minutes, - start_time_hh_mm_ss: str = '00_00_00' - ): + def aggregate_data(self, duration_minutes, start_time_hh_mm_ss: str = "00_00_00"): # first collect all data aggregation events - x, y = self.data_ingestion.aggregate_data(start_time_hh_mm_ss, duration_minutes, self.frequency_minutes) + x, y = self.data_ingestion.aggregate_data( + start_time_hh_mm_ss, duration_minutes, self.frequency_minutes + ) return x, y def aggregate_data_searches( - self, - duration_minutes, - start_time_hh_mm_ss: str = '00_00_00' + self, duration_minutes, start_time_hh_mm_ss: str = "00_00_00" ): # first collect all data aggregation events - x, y = self.searches.aggregate_data(start_time_hh_mm_ss, duration_minutes, self.frequency_minutes) + x, y = self.searches.aggregate_data( + start_time_hh_mm_ss, duration_minutes, self.frequency_minutes + ) return x, y + def aggregate_index_addition(self, start_time_hh_mm_ss="00_00_00"): + y2 = self.index_addition.aggregate_index_addition( + self.cluster.index_count, start_time_hh_mm_ss, self.frequency_minutes + ) + return y2 + + def add_index_to_cluster(self, ind, time): + if self.cluster.index_count >= self.index_added_list[ind]: + return + + index_count_to_add = self.index_added_list[ind] - self.cluster.index_count + + for index in range(index_count_to_add): + self.cluster.create_index( + self.cluster.primary_shards_per_index, + self.cluster.replica_shards_per_index, + time, + ) + index_list = [] + index_list.append(self.cluster.indices[-1]) + self.cluster.allocate_shards_to_node(index_list) + self.cluster.index_count += 1 + def compute_cpu(self, data_rate): - if data_rate in range(0,21): - cpu_rate = random.uniform(5,20) - return round(cpu_rate,2) - - if data_rate in range(20,50): - cpu_rate = random.uniform(20,40) - return round(cpu_rate,2) - - if data_rate in range(50,80): - cpu_rate = random.uniform(40,60) - return round(cpu_rate,2) - - if data_rate in range(80,200): - cpu_rate = random.uniform(60,80) - return round(cpu_rate,2) - + if data_rate in range(0, 21): + cpu_rate = random.uniform(5, 20) + return round(cpu_rate, 2) + + if data_rate in range(20, 50): + cpu_rate = random.uniform(20, 40) + return round(cpu_rate, 2) + + if data_rate in range(50, 80): + cpu_rate = random.uniform(40, 60) + return round(cpu_rate, 2) + + if data_rate in range(80, 200): + cpu_rate = random.uniform(60, 80) + return round(cpu_rate, 2) + if data_rate > 200: - cpu_rate = random.uniform(80,90) - return round(cpu_rate,2) + cpu_rate = random.uniform(80, 90) + return round(cpu_rate, 2) if data_rate <= 0: - return round(random.uniform(5,20),2) - - return round(random.uniform(5,10),2) + return round(random.uniform(5, 20), 2) + + return round(random.uniform(5, 10), 2) def cpu_used_for_ingestion(self, ingestion, search_count, index): - # cpu_util = ingestion / self.cluster.total_nodes_count * random.randrange(1, 15) / 100 * 100 cpu_util = self.compute_cpu(int(ingestion)) - cpu_util = cpu_util * (7/self.cluster.total_nodes_count) - # cpu_util = cpu_util * (self.cluster.total_nodes_count/7) + cpu_util = cpu_util * (7 / self.cluster.total_nodes_count) + for search_type, count_array in search_count.items(): - cpu_load_percent = self.search_description[search_type].search_stat.cpu_load_percent / 100 + cpu_load_percent = ( + self.search_description[search_type].search_stat.cpu_load_percent / 100 + ) search_factor = count_array[index] * cpu_load_percent - search_factor = search_factor * (7/self.cluster.total_nodes_count) + search_factor = search_factor * (7 / self.cluster.total_nodes_count) cpu_util += search_factor + return min(cpu_util, 100) def memory_used_for_ingestion(self, ingestion, search_count, index): - memory_util = ingestion / self.cluster.total_nodes_count * random.randrange(5, 12) / 100 * 100 + memory_util = ( + ingestion/self.cluster.total_nodes_count* random.randrange(5, 12)/100 * 100 + ) for search_type, count_array in search_count.items(): - memory_load_percent = self.search_description[search_type].search_stat.memory_load_percent / 100 + memory_load_percent = ( + self.search_description[search_type].search_stat.memory_load_percent + / 100 + ) search_factor = count_array[index] * memory_load_percent - search_factor = search_factor * (7/self.cluster.total_nodes_count) + search_factor = search_factor * (7 / self.cluster.total_nodes_count) memory_util += search_factor return min(memory_util, 98) def heap_used_for_ingestion(self, ingestion, search_count, index, memory_util): # heap_util = ingestion / self.cluster.total_nodes_count * random.randrange(5, 8) / 200 * 100 - heap_util = memory_util * (2/3) + heap_util = memory_util * (2 / 3) for search_type, count_array in search_count.items(): - heap_load_percent = self.search_description[search_type].search_stat.heap_load_percent / 100 + heap_load_percent = ( + self.search_description[search_type].search_stat.heap_load_percent / 100 + ) search_factor = count_array[index] * heap_load_percent - search_factor = search_factor * (7/self.cluster.total_nodes_count) + search_factor = search_factor * (7 / self.cluster.total_nodes_count) heap_util += search_factor return min(heap_util, 100) def cluster_state_for_ingestion(self, ingestion): if ingestion < constants.HIGH_INGESTION_RATE_GB_PER_HOUR: - return random.choice([constants.CLUSTER_STATE_GREEN] * 20 + [constants.CLUSTER_STATE_YELLOW]) + return random.choice( + [constants.CLUSTER_STATE_GREEN] * 20 + [constants.CLUSTER_STATE_YELLOW] + ) if self.cluster.status == constants.CLUSTER_STATE_RED: - return random.choice([constants.CLUSTER_STATE_YELLOW] + [constants.CLUSTER_STATE_RED]*5) + return random.choice( + [constants.CLUSTER_STATE_YELLOW] + [constants.CLUSTER_STATE_RED] * 5 + ) return random.choice( - [constants.CLUSTER_STATE_GREEN] * 20 + [constants.CLUSTER_STATE_YELLOW] * 10 + [constants.CLUSTER_STATE_RED]) - + [constants.CLUSTER_STATE_GREEN] * 20 + + [constants.CLUSTER_STATE_YELLOW] * 10 + + [constants.CLUSTER_STATE_RED] + ) + def disk_utilization_for_ingestion(self): - return self.cluster.calculate_cluster_disk_size() + return self.cluster.calculate_cluster_disk_size(self.current_index_count) - def disk_util_for_index_roll_over(self): - for index in range(len(self.cluster.indices)): + def disk_util_for_index_roll_over(self, time): + # for index in range(len(self.cluster.indices)): + for index in range(self.current_index_count): index_size = self.cluster.indices[index].get_index_primary_size() - - if index_size >= self.cluster.index_roll_over_size_gb and not self.cluster.indices[index].rolled_over: - if self.cluster.rolled_over_index_id!= -1: + roll_over_age = False + # print('Time: ',self.cluster.indices[index].created_at + timedelta(hours = self.cluster.index_roll_over_hours) ) + if ( + self.cluster.indices[index].time_elapsed_last_roll_over + + timedelta(hours=self.cluster.index_roll_over_hours) + <= time + ): + roll_over_age = True + if ( + index_size >= self.cluster.index_roll_over_size_gb or roll_over_age + ) and not self.cluster.indices[index].rolled_over: + if self.cluster.rolled_over_index_id != -1: # Roll over index already exists # Add the size of index with roll over index size - self.cluster.indices[self.cluster.rolled_over_index_id].index_size+= index_size - self.cluster.indices[self.cluster.rolled_over_index_id].shards[0].shard_size+= index_size - self.cluster.rolled_index_size=self.cluster.indices[self.cluster.rolled_over_index_id].shards[0].shard_size + self.cluster.indices[ + self.cluster.rolled_over_index_id + ].index_size += index_size + self.cluster.indices[self.cluster.rolled_over_index_id].shards[ + 0 + ].shard_size += index_size + self.cluster.rolled_index_size = ( + self.cluster.indices[self.cluster.rolled_over_index_id] + .shards[0] + .shard_size + ) # discard the shards of roll over index for shard in range(len(self.cluster.indices[index].shards)): self.cluster.indices[index].shards[shard].shard_size = 0 - + self.cluster.indices[index].time_elapsed_last_roll_over = time + # If it is first roll over, discard replicas and retain primaries else: node_id = self.cluster.get_available_node_id() @@ -164,143 +228,203 @@ def disk_util_for_index_roll_over(self): for shard in range(len(self.cluster.indices[index].shards)): # if self.cluster.indices[index].shards[shard].type == "Replica": del self.cluster.indices[index].shards[0] - shard-=1 + shard -= 1 # Merge the primaries - shard = self.cluster.indices[index].initialize_shards(1,0) + shard = self.cluster.indices[index].initialize_shards(1, 0) shard[0].node_id = id shard[0].index_id = self.cluster.indices[index].index_id shard[0].shard_size = index_size self.cluster.nodes[id].shards_on_node.append(shard[0]) # Add the primary size to roll over index size - self.cluster.indices[index].index_size+= index_size - self.cluster.rolled_index_size+=index_size + self.cluster.indices[index].index_size += index_size + self.cluster.rolled_index_size += index_size # mark the index is rolled over self.cluster.indices[index].rolled_over = True - # set the index roll over id - self.cluster.rolled_over_index_id = self.cluster.indices[index].index_id + # set the index roll over id + self.cluster.rolled_over_index_id = self.cluster.indices[ + index + ].index_id self.cluster.indices[index].shards.append(shard[0]) # create a new index with similar configuration of rolled over index - self.cluster.create_index(self.cluster.primary_shards_per_index, self.cluster.replica_shards_per_index) + self.cluster.create_index( + self.cluster.primary_shards_per_index, + self.cluster.replica_shards_per_index, + time, + ) # allocate the shards self.cluster.allocate_shards_to_node() - return self.cluster.calculate_cluster_disk_size() - + return self.cluster.calculate_cluster_disk_size(self.current_index_count) def distribute_load(self, ingestion): """ - The function will select an Index and distribute + The function will select an Index and distribute data in an arbitrary fashion. """ # Repeat the process till data distribution is complete # if not resimulation: - ingestion = (ingestion/60)*self.frequency_minutes + ingestion = (ingestion / 60) * self.frequency_minutes while int(ingestion) > 0: - # Select an index - index_id = random.randint(0, len(self.cluster.indices) - 1) - + # Select an index + # index_id = random.randint(0, len(self.cluster.indices) - 1) + index_id = random.randint(0, self.current_index_count - 1) + # If rolled over index is chosen, chose a different index while index_id == self.cluster.rolled_over_index_id: index_id = random.randint(0, len(self.cluster.indices) - 1) - + # Choose a part of the data and push it to index - data_pushed_to_index_gb = random.uniform(0.1*ingestion, ingestion) - - # subtract from the total - ingestion-=data_pushed_to_index_gb + data_pushed_to_index_gb = random.uniform(0.1 * ingestion, ingestion) + + # subtract from the total + ingestion -= data_pushed_to_index_gb # Get primary shard count and evaluate the data size to be pushed primary_shards_count = self.cluster.primary_shards_per_index - data_per_shard_gb = data_pushed_to_index_gb/primary_shards_count + data_per_shard_gb = data_pushed_to_index_gb / primary_shards_count - # Update the size of shards + # Update the size of shards for shard in range(len(self.cluster.indices[index_id].shards)): - self.cluster.indices[index_id].shards[shard].shard_size+= data_per_shard_gb + self.cluster.indices[index_id].shards[ + shard + ].shard_size += data_per_shard_gb @timeit - def run(self, duration_minutes, start_time="00_00_00", resimulate=False,time=None): + def run(self, duration_minutes, start_time="00_00_00", resimulate=False, time=None): resultant_cluster_objects = [] - - if time==None: + if time == None: now = datetime.now() else: - now=time - + now = time + if start_time == "00_00_00": date_obj = now - timedelta( hours=now.hour, minutes=now.minute, seconds=now.second, - microseconds=now.microsecond + microseconds=now.microsecond, ) else: date_obj = now if resimulate: start_time_simulate = self.total_simulation_minutes - duration_minutes - start_time_minute = int((start_time_simulate - (start_time_simulate % self.frequency_minutes))/self.frequency_minutes) - data_y = self.simulated_data_rates[start_time_minute-1:] - data_y1 ={} - data_y1['simple'] = self.simulated_search_rates['simple'][start_time_minute-1:] - data_y1["medium"] = self.simulated_search_rates['medium'][start_time_minute-1:] - data_y1['complex'] = self.simulated_search_rates['complex'][start_time_minute-1:] + start_time_minute = int( + (start_time_simulate - (start_time_simulate % self.frequency_minutes)) + / self.frequency_minutes + ) + data_y = self.simulated_data_rates[start_time_minute - 1 :] + data_y1 = {} + data_y1["simple"] = self.simulated_search_rates["simple"][ + start_time_minute - 1 : + ] + data_y1["medium"] = self.simulated_search_rates["medium"][ + start_time_minute - 1 : + ] + data_y1["complex"] = self.simulated_search_rates["complex"][ + start_time_minute - 1 : + ] + data_y2 = self.index_added_list[start_time_minute - 1 :] + self.cluster.date_time = date_obj else: data_x, data_y = self.aggregate_data(duration_minutes, start_time) - data_x1, data_y1 = self.aggregate_data_searches(duration_minutes,start_time) + data_x1, data_y1 = self.aggregate_data_searches( + duration_minutes, start_time + ) + data_y2 = self.aggregate_index_addition() self.simulated_data_rates = data_y.copy() self.simulated_search_rates = data_y1.copy() + self.index_added_list = data_y2.copy() + self.total_simulation_minutes = duration_minutes for index, instantaneous_data_ingestion_rate in enumerate(data_y): + self.current_index_count = data_y2[index] + self.cluster.instantaneous_index_count = data_y2[index] self.cluster._ingestion_rate = instantaneous_data_ingestion_rate self.cluster._simple_query_rate = data_y1["simple"][index] self.cluster._medium_query_rate = data_y1["medium"][index] self.cluster._complex_query_rate = data_y1["complex"][index] - self.cluster.cpu_usage_percent = self.cpu_used_for_ingestion(instantaneous_data_ingestion_rate, data_y1, - index) - self.cluster.memory_usage_percent = self.memory_used_for_ingestion(instantaneous_data_ingestion_rate, - data_y1, index) - self.cluster.heap_usage_percent = self.heap_used_for_ingestion(instantaneous_data_ingestion_rate, - data_y1, index, - self.cluster.memory_usage_percent ) - self.cluster.status = self.cluster_state_for_ingestion(instantaneous_data_ingestion_rate) + self.cluster.cpu_usage_percent = self.cpu_used_for_ingestion( + instantaneous_data_ingestion_rate, data_y1, index + ) + self.cluster.memory_usage_percent = self.memory_used_for_ingestion( + instantaneous_data_ingestion_rate, data_y1, index + ) + self.cluster.heap_usage_percent = self.heap_used_for_ingestion( + instantaneous_data_ingestion_rate, + data_y1, + index, + self.cluster.memory_usage_percent, + ) + self.cluster.status = self.cluster_state_for_ingestion( + instantaneous_data_ingestion_rate + ) + self.add_index_to_cluster(index, self.cluster.date_time) self.distribute_load(instantaneous_data_ingestion_rate) # Todo: simulate effect on remaining cluster parameters self.cluster.cluster_disk_size_used = self.disk_utilization_for_ingestion() - self.cluster.cluster_disk_size_used = self.disk_util_for_index_roll_over() + self.cluster.cluster_disk_size_used = self.disk_util_for_index_roll_over( + self.cluster.date_time + ) # self.cluster.cluster_disk_size_used+= (constants.INITIAL_DISK_SPACE_FACTOR * self.cluster.total_disk_size_gb) - self.cluster.disk_usage_percent = min((self.cluster.cluster_disk_size_used/self.cluster.total_disk_size_gb) * 100, 100) + self.cluster.disk_usage_percent = min( + (self.cluster.cluster_disk_size_used / self.cluster.total_disk_size_gb) + * 100, + 100, + ) date_time = date_obj + timedelta(minutes=self.elapsed_time_minutes) self.cluster.date_time = date_time - self.cluster.active_primary_shards = self.cluster.primary_shards_per_index * self.cluster.index_count - self.cluster.active_shards = (self.cluster.primary_shards_per_index * (self.cluster.replica_shards_per_index + 1)) * self.cluster.index_count + self.cluster.active_primary_shards = ( + self.cluster.primary_shards_per_index * self.cluster.index_count + ) + self.cluster.active_shards = ( + self.cluster.primary_shards_per_index + * (self.cluster.replica_shards_per_index + 1) + ) * self.cluster.index_count resultant_cluster_objects.append(copy.deepcopy(self.cluster)) self.elapsed_time_minutes += self.frequency_minutes - + print("======== Size of nodes ===========") for node in range(len(self.cluster.nodes)): - print("Size of Node "+str(node)+" : ",self.cluster.nodes[node].calculate_total_node_size()) - + print( + "Size of Node " + str(node) + " : ", + self.cluster.nodes[node].calculate_total_node_size(), + ) + print("========= Number of Shards in nodes ========= ") for node in range(len(self.cluster.nodes)): - print("node "+str(node)+": ",len(self.cluster.nodes[node].shards_on_node)) + print( + "node " + str(node) + ": ", len(self.cluster.nodes[node].shards_on_node) + ) # print("======= Size of Indexes ========") # for index in range(len(self.cluster.indices)): - # print("Size of index "+str(index)+" : ",self.cluster.indices[index].get_index_primary_size()) + # print( + # "Size of index " + str(index) + " : ", + # self.cluster.indices[index].get_index_primary_size(), + # ) print("========= Index Roll over size ========") print(self.cluster.rolled_index_size) - + print("========= Size of Cluster ========") print(self.cluster.cluster_disk_size_used) + + # print("======= Index Addition List =======") + # print(self.index_added_list) + + # print("======= Index obj ========") + # for index in range(len(self.cluster.indices)): + # print(self.cluster.indices[index].__dict__) + print("") return resultant_cluster_objects diff --git a/simulator/src/plotter.py b/simulator/src/plotter.py index 9d35648..950301d 100644 --- a/simulator/src/plotter.py +++ b/simulator/src/plotter.py @@ -16,7 +16,7 @@ @timeit def plot_data_points(cluster_objects, skip_data_ingestion=False, skip_search_query=False): - graph_count = 7 + graph_count = 8 data_ingestion_over_time = [] simple_search_query_over_time = [] medium_search_query_over_time = [] @@ -28,6 +28,7 @@ def plot_data_points(cluster_objects, skip_data_ingestion=False, skip_search_que cluster_status_over_time = [] nodes_over_time = [] date_time_points = [] + index_count = [] for cluster_obj in cluster_objects: date_time_points.append(cluster_obj.date_time) simple_search_query_over_time.append(cluster_obj._simple_query_rate) @@ -40,6 +41,7 @@ def plot_data_points(cluster_objects, skip_data_ingestion=False, skip_search_que disk_util_over_time.append(cluster_obj.disk_usage_percent) cluster_status_over_time.append(cluster_obj.status) nodes_over_time.append(cluster_obj.total_nodes_count) + index_count.append(cluster_obj.instantaneous_index_count) if not skip_data_ingestion: plt.subplot(graph_count, 1, 1) @@ -77,6 +79,10 @@ def plot_data_points(cluster_objects, skip_data_ingestion=False, skip_search_que plt.ylabel('Node Count', font2) plt.plot(date_time_points, nodes_over_time) + plt.subplot(graph_count, 1, 8) + plt.ylabel('Index Count', font2) + plt.plot(date_time_points, index_count) + plt.subplots_adjust(hspace=0.1) plt.xlabel('Datetime -->', font2) fig.tight_layout() diff --git a/simulator/src/schema.py b/simulator/src/schema.py index 0e35c03..f4bb418 100644 --- a/simulator/src/schema.py +++ b/simulator/src/schema.py @@ -10,7 +10,8 @@ "primary_shards_per_index": {"required": True, "type": "number"}, "replica_shards_per_index": {"required": True, "type": "number"}, "index_roll_over_size_gb": {"required": True, "type": "number"}, - "index_clean_up_age_days": {"required": True, "type": "number"}, + # "index_clean_up_age_days": {"required": True, "type": "number"}, + "index_roll_over_hours": { "required" : True, "type" : "number"}, "min_nodes_in_cluster":{"required": True, "type": "number"}, "heap_memory_factor":{"required": True, "type": "number"}, "total_disk_size_gb": {"required": True, "type": "number"}, @@ -40,6 +41,13 @@ "complex": {"required": False, "type": "number"}, }, }, + "index":{"required" : False, "type" : "dict" }, + "schema" : { + "type" : "dict", + "schema" : { + "count" : {"required" : True, "type" : "number"}, + }, + }, }, }, },