Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/mass ingestion #8

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ scripts/output_vars.sh
terraform/teardown_details.txt

testing/data/__pycache__/*
testing/data/__pycache__/generate_data.cpython-312.pyc
testing/data/__pycache__/generate_data.cpython-312.pyc

testing/load_data.yaml
testing/schema-mapping/tables.yaml
kafka_topics.txt
34 changes: 28 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ Deploy an EC2 kafka instance programmatically using terraform. The EC2 instance

### Input Variables

Set your AWS account using `aws configure`. Retrieve the output IP addresses from 1/ your provisioned SingleStore workspace cluster and 2/ any other IP addresses you would like Kafka to connect with. Run the following command to populate your environment variables:
Set your AWS account using `aws configure`. Retrieve the output IP addresses from 1/ your provisioned SingleStore workspace cluster and 2/ any other IP addresses you would like Kafka to connect with.

If you'd like to map sample MySQL data create a `schema-mapping/mysql-schema.sql` containing the table.

Then, run the following command to populate your environment variables:

```bash
bash scripts/var_gen.sh
Expand All @@ -44,6 +48,23 @@ export EC2_PUBLIC_IP="<outputted public IP>"
bash scripts/load_kafka.sh
```

If you would like to automate the Kafka data loading, create a `testing/load_data.sh` with the following format:

```yaml
streaming:
- topic_name: "topic_2"
record_count: 1000
dataset: vehicle_data
- topic_name: "topic_2"
record_count: 500
dataset: log_data
- topic_name: "topic_3"
record_count: 2000
dataset: user_data
```

Note: creating `schema-mapping/mysql-schema.sql` will automatically create this when you ran the `var_gen.sh` script.

### SingleStore Ingestion

Load the notebook `testing/ec2-kafka-s2-notebook.ipynb` into SingleStore Helios.
Expand All @@ -66,8 +87,9 @@ Once you are finished using the project, delete the notebook and the associated

### Code Layout

| Path | Description |
| :--------- | :------------------------------------------------------------- |
| terraform/ | Terraform source code. |
| scripts/ | shell scripts to build, deploy, and interact with the project. |
| testing/ | Example kafka ingestion. |
| Path | Description |
| :-------------- | :------------------------------------------------------------- |
| terraform/ | Terraform source code. |
| scripts/ | shell scripts to build, deploy, and interact with the project. |
| testing/ | Example kafka ingestion. |
| schema_mapping/ | Mapping out table syntax to sample entries. |
6 changes: 5 additions & 1 deletion features.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

In Progress:

- feat - vish: can add connection details into notebook and make it work from local
- feat: combining load_data.yaml with data.yaml
- feat: substituting EC2_PUBLIC_IP on deploy in notebook
- feat: mysql schema mapping included with var_gen.sh

**Future scope:**

- feat: Terraform provider for Confluent Kafka
- feat: can add connection details into notebook and make it work from local
- feat: add in dictionary representation to include tagging / description for IP addresses
- feat: let user pass the table ddl which could be used to generate custom data gen function that could be passed to a streaming script to stream data to created kafka topic
- Feat: stopping and starting an instance
Expand Down
157 changes: 104 additions & 53 deletions iac-ec2-kafka.drawio

Large diffs are not rendered by default.

99 changes: 99 additions & 0 deletions schema-mapping/map-data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import os
import re
import yaml

sql_file_name = "mysql-schema.sql"
yaml_file_name = "../testing/data/data.yaml"
kafka_topic_file_name = "kafka_topics.txt"
load_data_file_name = "../testing/load_data.yaml"
script_dir = os.path.dirname(os.path.abspath(__file__))
sql_file_path = os.path.join(script_dir, sql_file_name)
yaml_file_path = os.path.join(script_dir, yaml_file_name)
kafka_file_path = os.path.join(script_dir, kafka_topic_file_name)
load_data_file_path = os.path.join(script_dir, load_data_file_name)
RECORD_COUNT = 1000

def topic_config(table_names):
return [{'name': table + '_topic', 'partitions': 4 } for table in table_names]

# Define a function to extract CREATE TABLE statements
def extract_create_statements(file_path):
with open(file_path, 'r') as sql_file:
sql_content = sql_file.read()
create_table_pattern = r'CREATE TABLE.*?;(?:\s|$)'
create_statements = re.findall(create_table_pattern, sql_content, re.DOTALL | re.IGNORECASE)
return create_statements


def write_yaml(input_dict, file_path):
with open(file_path, 'w') as yaml_file:
yaml.dump(input_dict, yaml_file, default_flow_style=False)

def generate_attr_def(attr, attr_type):
definition = {}
if attr_type.startswith("INT") or attr_type.startswith("DECIMAL") or attr_type.startswith("FLOAT"):
definition["type"] = "int"
definition["min"] = 1
definition["max"] = 100
elif attr_type.startswith("VARCHAR") or attr_type.startswith("TEXT") or attr_type.startswith("ENUM"):
definition["type"] = "choice"
definition["values"] = ["choice 1", "choice 2", "choice 3"]
elif attr_type.startswith("DATE"):
definition["type"] = "date"
elif attr_type.startswith("BOOLEAN"):
definition["type"] = "int"
definition["min"] = 0
definition["max"] = 1
elif attr_type.startswith("TIMESTAMP") or attr_type.startswith("TIME"):
definition["type"] = "timestamp"
else:
definition["type"] = attr_type
return definition

# TODO: Substitution of ' to " and removing initial " with '
def generate_table_names(tables_dict):
table_names = tables_dict["data"].keys()
topics = topic_config(table_names)
with open(kafka_file_path, 'w') as file:
file.write("\"")
file.write(str(topics))
file.write("\"")
return table_names

def create_streaming_file(table_names):
streaming_list = []
for name in table_names:
streaming_list.append({
"topic_name": name + "_topic",
"record_count": RECORD_COUNT,
"dataset": name,
})
write_yaml({"streaming": streaming_list}, load_data_file_path)

def write_tables_dict(create_table_statements):
tables_dict = {"data": {}}
for table_statement in create_table_statements:
table_input = {}
split_table = table_statement.split("\n")
name = split_table[0].lower().split(" ")[2]
for line in split_table[1:-2]:
words = line.lstrip().split(" ")
attr = words[0]
if attr != "FOREIGN" and attr != "PRIMARY":
attr_type = words[1]
table_input[attr] = generate_attr_def(attr, attr_type)
print(attr, attr_type)
tables_dict["data"][name] = table_input
print("")
write_yaml(tables_dict, yaml_file_path)
return tables_dict


if __name__ == "__main__":
create_table_statements = extract_create_statements(sql_file_path)
tables_dict = write_tables_dict(create_table_statements)
table_names = generate_table_names(tables_dict)
create_streaming_file(table_names)



Loading