Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index Addition and Roll over due to age #337

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 244 additions & 104 deletions simulator/src/app.py

Large diffs are not rendered by default.

259 changes: 172 additions & 87 deletions simulator/src/cluster.py

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion simulator/src/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ index_count: 100
primary_shards_per_index: 2
replica_shards_per_index: 1
index_roll_over_size_gb: 10
index_clean_up_age_days: 20
# index_clean_up_age_days: 20
index_roll_over_hours: 12
total_disk_size_gb: 14000
simulation_frequency_minutes: 5

Expand Down Expand Up @@ -79,6 +80,8 @@ states:
simple: 110000
medium: 80000
complex: 55000
index:
count: 10
- position: 10
time_hh_mm_ss: '13_00_00'
ingestion_rate_gb_per_hr: 20
Expand Down Expand Up @@ -199,6 +202,8 @@ states:
simple: 100000
medium: 80000
complex: 50000
index:
count: 10
- position: 8
time_hh_mm_ss: '11_00_00'
ingestion_rate_gb_per_hr: 24
Expand Down Expand Up @@ -306,6 +311,8 @@ states:
searches:
simple: 50000
medium: 2000
index:
count: 10
- position: 4
time_hh_mm_ss: '06_00_00'
ingestion_rate_gb_per_hr: 3
Expand Down
67 changes: 35 additions & 32 deletions simulator/src/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from search import Search
from search import SearchDescription
from errors import ValidationError
from index_addittion import IndexAddition


class Config:
Expand All @@ -20,12 +21,12 @@ class Config:
"""

def __init__(
self,
stats: dict,
states: list[dict],
search_description: dict[dict],
simulation_frequency_minutes: int,
randomness_percentage: int,
self,
stats: dict,
states: list[dict],
search_description: dict[dict],
simulation_frequency_minutes: int,
randomness_percentage: int,
):
"""
Initialise the Config object
Expand All @@ -37,41 +38,41 @@ def __init__(
"""
self.cluster = Cluster(**stats)
self.simulation_frequency_minutes = simulation_frequency_minutes
# all_states = [
# State(position=state["position"],
# time_hh_mm_ss=state["time_hh_mm_ss"],
# ingestion_rate_gb_per_hr=state["ingestion_rate_gb_per_hr"])
# for state in states
# ]
all_states = []
day_state=[]
day_state = []
for state in states:
for position in state["pattern"]:
day_state.append(State(position=position['position'],
day_state.append(
State(
position=position["position"],
time_hh_mm_ss=position["time_hh_mm_ss"],
ingestion_rate_gb_per_hr=position["ingestion_rate_gb_per_hr"]))
ingestion_rate_gb_per_hr=position["ingestion_rate_gb_per_hr"],
)
)
all_states.append(copy.deepcopy(day_state))
day_state.clear()
self.data_function = DataIngestion(all_states, randomness_percentage)
self.search_description = {search_type:
SearchDescription(search_stat=SearchStat(**specs), search_type=search_type)
for search_type, specs in search_description.items()
}
# self.searches = Search([
# SearchState(position=state["position"],
# time_hh_mm_ss=state["time_hh_mm_ss"],
# searches=state["searches"])
# for state in states
# ])
search=[]
self.search_description = {
search_type: SearchDescription(
search_stat=SearchStat(**specs), search_type=search_type
)
for search_type, specs in search_description.items()
}
search = []
for state in states:
searches_day =[]
searches_day = []
for position in state["pattern"]:
searches_day.append(SearchState(position=position['position'],
time_hh_mm_ss=position["time_hh_mm_ss"],
searches=position['searches']))
search.append(searches_day)
searches_day.append(
SearchState(
position=position["position"],
time_hh_mm_ss=position["time_hh_mm_ss"],
searches=position["searches"],
)
)
search.append(searches_day)
self.searches = Search(search)
self.index_addition = IndexAddition(states)


def get_source_code_dir():
"""
Expand All @@ -80,6 +81,7 @@ def get_source_code_dir():
"""
return Path(__file__).parent.resolve()


def validate_config(all_configs: dict):
"""
Validate dictionary of configs (read from config file) against the defined schema
Expand All @@ -97,6 +99,7 @@ def validate_config(all_configs: dict):
validator = Validator(schema)
return validator.validate(all_configs, schema), validator.errors


def parse_config(config_file_path: str):
"""
Read and parse the config file into objects,
Expand All @@ -119,7 +122,7 @@ def parse_config(config_file_path: str):

# Perform Validation of the config file
is_valid, errors = validate_config(all_configs)

if not is_valid:
raise ValidationError("Error validating config file - " + str(errors))

Expand Down
7 changes: 5 additions & 2 deletions simulator/src/index.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from shard import Shard

import datetime

class Index:
"""
Expand All @@ -12,7 +12,8 @@ def __init__(
self,
primary_shards_count: int,
replica_shards_count: int,
index_id: int
index_id: int,
time: datetime.datetime
):
"""
Initialize the index object
Expand All @@ -25,6 +26,8 @@ def __init__(
self.index_id = index_id
self.rolled_over = False
self.index_size = 0
self.created_at = time
self.time_elapsed_last_roll_over = time
self.shards = self.initialize_shards(primary_shards_count, replica_shards_count)


Expand Down
83 changes: 83 additions & 0 deletions simulator/src/index_addittion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from scipy.interpolate import InterpolatedUnivariateSpline
import numpy as np
import math

class Index:
"""
Represents and holds the data fetched from the configuration for index addition
"""
def __init__(self,
index_count:int,
primary_count:int,
replica_count:int):
self.index_count = index_count
self.primary_count = primary_count
self.replica_count = replica_count

class IndexAddition:
"""
Parses the configuration and creates relevant index objects,
Performs aggregation of index addition.
"""
def __init__(self,
states: list[dict]
):
self.states = states

def aggregate_index_addition(self,initial_index_count, start_time_hh_mm_ss: str, frequency_minutes:int):
"""
Produces cumulative index count over time period and returns a list of aggregated index count
for given duration.

:param start_time_hh_mm_ss: start time in hh_mm_ss in 24-hour format.
:param duration_minutes: duration of point generation in minutes
:param frequency_minutes: gap between the resultant points
:return: array of int containing resultant index aggregation points
"""
time_of_day = []
total_index_count = initial_index_count
index_addition = []
start_time_hour = int(start_time_hh_mm_ss.split("_")[0])
start_time_minutes = int(start_time_hh_mm_ss.split("_")[1])
if start_time_minutes > 0:
start_time_hour+=1
duration_of_day = ((24 - start_time_hour)*60)+ ((60 - start_time_minutes)%60)
for day in self.states:
time_of_day.clear()
index_added = []
index_count_list = []
for position in day['pattern']:
if int(position['time_hh_mm_ss'].split("_")[0]) >= int(start_time_hh_mm_ss.split("_")[0]):
time_of_day.append(
(int(position['time_hh_mm_ss'].split("_")[0]) - int("0")) * 60
)

index_addition_rate = position.get('index',0)
if index_addition_rate == 0:
index_added.append(Index(0,0,0))
index_count_list.append(total_index_count)
else:
index_added.append(Index(index_addition_rate.get('count'),
index_addition_rate.get('primary'),
index_addition_rate.get('replica')
))
total_index_count+= index_addition_rate.get('count')
index_count_list.append(total_index_count)

intervals = int(duration_of_day/frequency_minutes)
if start_time_hh_mm_ss == "00_00_00":
x = np.linspace(0, 24*60, intervals)
else:
start = int(start_time_hh_mm_ss.split("_")[0])
x = np.linspace(start, duration_of_day, intervals)

order = 1
s = InterpolatedUnivariateSpline(
time_of_day, index_count_list, k=order
)

y = [min(int(math.ceil(max(i, initial_index_count))), total_index_count) for i in s(x)]
for val in y:
index_addition.append(val)

return index_addition
Loading