diff --git a/Structured Dremio Solution/Flask-api/Dockerfile b/Structured Dremio Solution/Flask-api/Dockerfile new file mode 100644 index 0000000..a4267f4 --- /dev/null +++ b/Structured Dremio Solution/Flask-api/Dockerfile @@ -0,0 +1,23 @@ +# Use the official Python image from the Docker Hub +FROM python:3.9-slim + +# Set the working directory in the container +WORKDIR / + +# Copy the requirements file into the container +COPY requirements.txt . + +# Install the dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application code into the container +COPY . . + +# Copy the .env file into the container +COPY api.env . + +# Expose the port the app runs on +EXPOSE 5000 + +# Command to run the api +CMD ["python", "api.py"] \ No newline at end of file diff --git a/Structured Dremio Solution/Flask-api/README.md b/Structured Dremio Solution/Flask-api/README.md new file mode 100644 index 0000000..ff6fd92 --- /dev/null +++ b/Structured Dremio Solution/Flask-api/README.md @@ -0,0 +1,3 @@ +Structured Dremio Solution - Flask api + +This folder contains the application and docker files for the structured solution api that allows users connected to deakins network using anyconnect VPN to make sql queries to fetch their data from dremio. diff --git a/Structured Dremio Solution/Flask-api/api.py b/Structured Dremio Solution/Flask-api/api.py new file mode 100644 index 0000000..2b3a3a0 --- /dev/null +++ b/Structured Dremio Solution/Flask-api/api.py @@ -0,0 +1,100 @@ +from flask import Flask, jsonify, request +import pandas as pd +import io +from dotenv import load_dotenv +import os +import requests +import re + +# Load environment variables from .env file +load_dotenv('api.env') + +app = Flask(__name__) + +# Dremio configuration +dremio_url = os.getenv('DREMIO_URL') +dremio_username = os.getenv('DREMIO_USERNAME') +dremio_password = os.getenv('DREMIO_PASSWORD') + +# Authenticate and get token +def get_dremio_token(): + auth_response = requests.post(f'{dremio_url}/apiv2/login', json={'userName': dremio_username, 'password': dremio_password}) + auth_response.raise_for_status() + return auth_response.json().get('token') + +# Function to execute SQL query on Dremio +def execute_dremio_query(sql): + token = get_dremio_token() + headers = { + 'Authorization': f'_dremio{token}', + 'Content-Type': 'application/json' + } + response = requests.post(f'{dremio_url}/api/v3/sql', headers=headers, json={'sql': sql}) + response.raise_for_status() + job_id = response.json().get('id') + return job_id + +# Function to get query results from Dremio +def get_dremio_query_results(job_id): + token = get_dremio_token() + headers = { + 'Authorization': f'_dremio{token}', + 'Content-Type': 'application/json' + } + # Poll the job status endpoint until the job is complete + while True: + response = requests.get(f'{dremio_url}/api/v3/job/{job_id}', headers=headers) + response.raise_for_status() + job_status = response.json().get('jobState') + if job_status == 'COMPLETED': + break + elif job_status in ('FAILED', 'CANCELED'): + raise Exception(f'Query failed with status: {job_status}') + + # Fetch the query results + response = requests.get(f'{dremio_url}/api/v3/job/{job_id}/results', headers=headers) + response.raise_for_status() + return response.json() + +# Function to list catalog items from Dremio +def list_dremio_catalog(): + token = get_dremio_token() + headers = { + 'Authorization': f'_dremio{token}', + 'Content-Type': 'application/json' + } + response = requests.get(f'{dremio_url}/api/v3/catalog', headers=headers) + response.raise_for_status() + return response.json() + +@app.route('/dremio_query', methods=['POST']) +def dremio_query(): + sql = request.json.get('sql') + if not sql: + return jsonify({'error': 'SQL query is required'}), 400 + + # Validate that the query is a SELECT query and does not contain harmful commands + harmful_commands = r'\b(DROP|DELETE|INSERT|UPDATE|ALTER|CREATE|TRUNCATE|REPLACE|MERGE|EXEC|EXECUTE|GRANT|REVOKE|SET|USE|CALL|LOCK|UNLOCK|RENAME|COMMENT|COMMIT|ROLLBACK|SAVEPOINT|RELEASE)\b' + if not re.match(r'^\s*SELECT\b', sql.strip(), re.IGNORECASE) or re.search(harmful_commands, sql, re.IGNORECASE): + return jsonify({'error': 'Only SELECT queries are allowed and no harmful commands are permitted'}), 400 + + try: + job_id = execute_dremio_query(sql) + result = get_dremio_query_results(job_id) + return jsonify(result) + except requests.exceptions.RequestException as e: + return jsonify({'error': str(e)}), 500 + except Exception as e: + return jsonify({'error': str(e)}), 500 + +@app.route('/dremio_catalog', methods=['GET']) +def dremio_catalog(): + try: + catalog = list_dremio_catalog() + return jsonify(catalog) + except requests.exceptions.RequestException as e: + return jsonify({'error': str(e)}), 500 + +if __name__ == '__main__': + port = int(os.getenv('FLASK_RUN_PORT', 5000)) + app.run(host='0.0.0.0', port=port) \ No newline at end of file diff --git a/Structured Dremio Solution/Flask-api/docker-compose-flask.yml b/Structured Dremio Solution/Flask-api/docker-compose-flask.yml new file mode 100644 index 0000000..f41a248 --- /dev/null +++ b/Structured Dremio Solution/Flask-api/docker-compose-flask.yml @@ -0,0 +1,18 @@ +version: '3.8' + +services: + flaskapp: + build: + context: . + dockerfile: Dockerfile + ports: + - "5000:5000" + env_file: + - api.env + container_name: structured-solution-api + networks: + - iceberg_env + +networks: + iceberg_env: + external: true \ No newline at end of file diff --git a/Structured Dremio Solution/Flask-api/requirements.txt b/Structured Dremio Solution/Flask-api/requirements.txt new file mode 100644 index 0000000..ca8d543 --- /dev/null +++ b/Structured Dremio Solution/Flask-api/requirements.txt @@ -0,0 +1,6 @@ +Flask==1.1.4 +Jinja2==2.11.3 +MarkupSafe==1.1.1 +requests +pandas +python-dotenv \ No newline at end of file diff --git a/Structured Dremio Solution/README.md b/Structured Dremio Solution/README.md new file mode 100644 index 0000000..2b3db5b --- /dev/null +++ b/Structured Dremio Solution/README.md @@ -0,0 +1 @@ +This folder is for code related to the structured Dremio solution running on the vm. diff --git a/Structured Dremio Solution/Script/README.md b/Structured Dremio Solution/Script/README.md new file mode 100644 index 0000000..e278edf --- /dev/null +++ b/Structured Dremio Solution/Script/README.md @@ -0,0 +1,3 @@ +Structured Dremio Solution - Script + +This script is a working version of a pipeline that pulls csv files from github, converts them into pandas dataframe and then feeds them into sqlite to output sql commands to create a table out of it. This is then passed to the specified dremio url in chunks to create a structured sql table of the data. diff --git a/Structured Dremio Solution/Script/pipeline.py b/Structured Dremio Solution/Script/pipeline.py new file mode 100644 index 0000000..b0dbaa8 --- /dev/null +++ b/Structured Dremio Solution/Script/pipeline.py @@ -0,0 +1,239 @@ +# Ensure you have env file containing details for dremio in your working directory before running this script +# When running the script using: python pipeline.py +# Add a space then a url for the csv files being uploaded to dremio. Each subsequent url should be separated by a space. + +import requests +import pandas as pd +import sqlite3 +import getpass +import time +from dotenv import load_dotenv +import os +import argparse +import logging +import sys +from urllib.parse import urlparse + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Function to get environment variables with error handling +def get_env_variable(var_name): + value = os.getenv(var_name) + if not value: + logging.error(f"Environment variable {var_name} is not set.") + sys.exit(1) + return value + +# Function to validate URLs +def is_valid_url(url): + try: + result = urlparse(url) + return all([result.scheme, result.netloc]) + except ValueError: + return False + +# Load environment variables from .env file +load_dotenv('pipeline.env') + +# Set up argument parsing +parser = argparse.ArgumentParser(description='Process CSV URLs.') +parser.add_argument('csv_urls', nargs='+', help='List of CSV file URLs') +args = parser.parse_args() + +# Get CSV URLs from command line arguments +csv_urls = args.csv_urls + +# Validate CSV URLs +validated_urls = [url for url in csv_urls if is_valid_url(url)] +if len(validated_urls) != len(csv_urls): + logging.error("One or more provided URLs are invalid.") + sys.exit(1) + +# Get environment variables with error handling +dremio_url = get_env_variable('DREMIO_URL') +username = get_env_variable('DREMIO_USERNAME') +password = get_env_variable('DREMIO_PASSWORD') +source = get_env_variable('DREMIO_SOURCE') + +chunk_size = 50 * 1024 * 1024 # 50MB chunk size (change as needed) + +# Authenticate and get token +try: + auth_response = requests.post(f'{dremio_url}/apiv2/login', json={'userName': username, 'password': password}) + auth_response.raise_for_status() + auth_token = auth_response.json().get('token') +except requests.exceptions.RequestException as e: + logging.error(f"Error occurred while making a request: {str(e)}") + sys.exit(1) + +# Headers for authenticated requests +headers = { + 'Authorization': f'_dremio{auth_token}', + 'Content-Type': 'application/json' +} + +# Function to filter out unsupported SQL commands +def filter_sql_commands(commands): + supported_commands = [ + "CREATE", "INSERT", "DELETE", "UPDATE", "DROP", "ALTER", "TRUNCATE", "SELECT", "VALUES" + ] + filtered_commands = [] + for command in commands: + if any(command.strip().upper().startswith(cmd) for cmd in supported_commands): + filtered_commands.append(command) + return filtered_commands + +# Mapping of SQLite types to Dremio types +type_mapping = { + "INTEGER": "INT", + "TEXT": "VARCHAR", + "REAL": "FLOAT", + "BLOB": "VARBINARY", + "BOOLEAN": "BOOLEAN", + "DATE": "DATE", + "FLOAT": "FLOAT", + "DECIMAL": "DECIMAL", + "DOUBLE": "DOUBLE", + "INTERVAL": "INTERVAL", + "BIGINT": "BIGINT", + "TIME": "TIME", + "TIMESTAMP": "TIMESTAMP" +} + +def convert_sqlite_to_dremio(sql_commands): + converted_commands = [] + for command in sql_commands: + for sqlite_type, dremio_type in type_mapping.items(): + command = command.replace(sqlite_type, dremio_type) + converted_commands.append(command) + return converted_commands + +# Function to send SQL command to Dremio +def send_sql_command(command): + try: + sql_response = requests.post(f'{dremio_url}/api/v3/sql', headers=headers, json={'sql': command}) + sql_response.raise_for_status() + logging.info(f'Executed SQL command: {sql_response.status_code}') + except requests.exceptions.RequestException as e: + logging.error(f'Failed to execute SQL command: {str(e)}') + logging.error(f'Response content: {sql_response.content}') + sys.exit(1) + time.sleep(5) + +# Function to combine INSERT commands into a single statement +def combine_insert_commands(insert_commands): + if not insert_commands: + return None + + # Extract the base part of the INSERT statement (i.e., columns part) + base_insert = insert_commands[0].split(' VALUES', 1)[0] # Get the INSERT part + + # Combine all VALUES parts into a single statement + combined_values = [] + for command in insert_commands: + values_part = command.split(' VALUES', 1)[1].strip().rstrip(';') + combined_values.append(values_part) + + combined_insert = f'{base_insert} VALUES {", ".join(combined_values)};' + return combined_insert + +# Function to send accumulated SQL commands in chunks +def send_sql_in_chunks(filtered_commands, max_chunk_size): + chunk = [] + chunk_size = 0 + for command in filtered_commands: + command_size = len(command.encode('utf-8')) + if chunk_size + command_size > max_chunk_size: + # Combine INSERT commands in the current chunk + combined_insert_command = combine_insert_commands(chunk) + if combined_insert_command: + send_sql_command(combined_insert_command) + # Reset chunk and chunk size + chunk = [] + chunk_size = 0 + + chunk.append(command) + chunk_size += command_size + + # Send any remaining commands in the last chunk + if chunk: + combined_insert_command = combine_insert_commands(chunk) + if combined_insert_command: + send_sql_command(combined_insert_command) + +# Function to insert data using prepared statements +def insert_data(conn, table_name, data): + placeholders = ', '.join(['?' for _ in data[0]]) + query = f"INSERT INTO {table_name} VALUES ({placeholders})" + cursor = conn.cursor() + try: + cursor.executemany(query, data) + conn.commit() + except sqlite3.Error as e: + logging.error(f"Database error occurred: {str(e)}") + sys.exit(1) + +# Download CSV files, convert to SQL, and upload to Dremio +logging.info("Script started") +logging.info(f"Processing {len(validated_urls)} CSV files") +for url in validated_urls: + try: + response = requests.get(url) + response.raise_for_status() + file_name = url.split('/')[-1] + table_name = file_name.split('.')[0] + + # Read CSV in chunks + for chunk in pd.read_csv(url, chunksize=1000): # Adjust the chunksize as needed for performance + # Convert chunk to SQL table using in-memory SQLite database + conn = sqlite3.connect(':memory:') + + # Create table schema based on the first chunk + chunk.to_sql(table_name, conn, if_exists='replace', index=False) + + # Insert data into the table + insert_data(conn, table_name, chunk.values.tolist()) + + # Extract SQL commands to create and populate the table + sql_commands = [] + for line in conn.iterdump(): + sql_commands.append(line) + conn.close() + + # Convert SQLite types to Dremio types + sql_commands = convert_sqlite_to_dremio(sql_commands) + + # Filter out unsupported SQL commands + filtered_commands = filter_sql_commands(sql_commands) + + # Separate CREATE TABLE command from INSERT commands + create_table_command = None + insert_commands = [] + for command in filtered_commands: + if command.strip().upper().startswith("CREATE TABLE"): + create_table_command = command + else: + insert_commands.append(command) + + # Modify the table name to include the full path + full_table_path = f'"{source}"."{table_name}"' + if create_table_command: + create_table_command = create_table_command.replace(f'"{table_name}"', full_table_path) + # Send the CREATE TABLE command + send_sql_command(create_table_command) + + insert_commands = [cmd.replace(f'"{table_name}"', full_table_path) for cmd in insert_commands] + + # Upload INSERT commands in chunks to Dremio + send_sql_in_chunks(insert_commands, chunk_size) # 50MB chunk size + except requests.exceptions.RequestException as e: + logging.error(f"Error occurred while downloading CSV file: {str(e)}") + sys.exit(1) + except pd.errors.ParserError as e: + logging.error(f"Error occurred while parsing CSV file: {str(e)}") + sys.exit(1) + except Exception as e: + logging.error(f"An unexpected error occurred: {str(e)}") + sys.exit(1) +logging.info("Script completed") \ No newline at end of file