From 9c84967f533cc151ae9224fb2ebace3b1b31768b Mon Sep 17 00:00:00 2001 From: Nick Davis <62664046+nickdavis2001@users.noreply.github.com> Date: Tue, 21 Nov 2023 17:00:41 +0000 Subject: [PATCH] Uml 3111 script for monthly reports (#2441) * refactored in prep to add polling functionality * basic polling is working, and athena functions WIP * athena flag now working * athena query now runs * show results of athena query * put s3 path in dictionary ready for this to be used for location * drop database first so we don't have tables with old data in * add ddl file * query all 4 tables * sql result now prints put * export stats table also * temp (WIP) describe table * ddl files, refactoring, stats table commented for now until working * stats table too * take in date range, with default * now runs 4 queries * tidy up output * tidy up output further * date substitution in sql string * refactor, tidy up, update README * refactoring * fix * add polling to query * print query output tidily * fix csv output * results * gitignore results files * run black to format python properly * get all results using token * maxresults and filename * fix dates --- .gitignore | 3 +- query/README.md | 147 +--------- query/dynamodb_export.py | 401 +++++++++++++++++++++------- query/results/README | 1 + query/tables/actor_codes.ddl | 8 + query/tables/actor_users.ddl | 10 + query/tables/stats.ddl | 24 ++ query/tables/user_lpa_actor_map.ddl | 12 + query/tables/viewer_activity.ddl | 10 + query/tables/viewer_codes.ddl | 13 + 10 files changed, 395 insertions(+), 234 deletions(-) create mode 100644 query/results/README create mode 100644 query/tables/actor_codes.ddl create mode 100644 query/tables/actor_users.ddl create mode 100644 query/tables/stats.ddl create mode 100644 query/tables/user_lpa_actor_map.ddl create mode 100644 query/tables/viewer_activity.ddl create mode 100644 query/tables/viewer_codes.ddl diff --git a/.gitignore b/.gitignore index 4e8e054dc9..5a0c204ade 100644 --- a/.gitignore +++ b/.gitignore @@ -16,10 +16,11 @@ scripts/getStatsResultDemo.json scripts/analysis_scripts/*.csv query/s3_objects/** +query/results/*.csv terraform/**/modules/**.terraform.lock.hcl .structurizr docs/diagrams/dsl/**/workspace.json *.pem -tests/vendor \ No newline at end of file +tests/vendor diff --git a/query/README.md b/query/README.md index f5fad954b6..d8775f93c9 100644 --- a/query/README.md +++ b/query/README.md @@ -1,4 +1,4 @@ -# Queries using DynamoDB data +# Queries using DynamoDB data and Athena Returns plaintext or json data of accounts matching either an LPA ID or a user's email address. @@ -15,157 +15,36 @@ Install pip modules pip install -r ./requirements.txt ``` -## Export Dynamodb data +## Export Dynamodb data, load Athena -Run the following script to request a DynanmoDB export to S3 +Run the following script to request a DynanmoDB export to S3, drop and re-creation of Athena database and tables, and query against Athena ```shell aws-vault exec identity -- python ./dynamodb_export.py --environment demo - -DynamoDB Table ARN: arn:aws:dynamodb:eu-west-1:367815980639:table/demo-ActorCodes -S3 Bucket Name: use-a-lpa-dynamodb-exports-development - IN_PROGRESS s3://use-a-lpa-dynamodb-exports-development/demo-ActorCodes/AWSDynamoDB/01617269934602-162d6355/data/ - - -DynamoDB Table ARN: arn:aws:dynamodb:eu-west-1:367815980639:table/demo-ActorUsers -S3 Bucket Name: use-a-lpa-dynamodb-exports-development - IN_PROGRESS s3://use-a-lpa-dynamodb-exports-development/demo-ActorUsers/AWSDynamoDB/01617269934827-b65efa47/data/ - - -DynamoDB Table ARN: arn:aws:dynamodb:eu-west-1:367815980639:table/demo-ViewerCodes -S3 Bucket Name: use-a-lpa-dynamodb-exports-development - IN_PROGRESS s3://use-a-lpa-dynamodb-exports-development/demo-ViewerCodes/AWSDynamoDB/01617269935076-45fe3523/data/ - - -DynamoDB Table ARN: arn:aws:dynamodb:eu-west-1:367815980639:table/demo-ViewerActivity -S3 Bucket Name: use-a-lpa-dynamodb-exports-development - IN_PROGRESS s3://use-a-lpa-dynamodb-exports-development/demo-ViewerActivity/AWSDynamoDB/01617269935310-6968000e/data/ - - -DynamoDB Table ARN: arn:aws:dynamodb:eu-west-1:367815980639:table/demo-UserLpaActorMap -S3 Bucket Name: use-a-lpa-dynamodb-exports-development - IN_PROGRESS s3://use-a-lpa-dynamodb-exports-development/demo-UserLpaActorMap/AWSDynamoDB/01617269935547-9e1e9b04/data/ ``` -You can check sthe status of the last export by running the command again with the `--check_exports` flag +## Script options +TODO TODO +You can check the status of the last dynamo export by running the command again with the `--check_exports` flag ```shell aws-vault exec identity -- python ./dynamodb_export.py --environment demo --check_exports -DynamoDB Table ARN: arn:aws:dynamodb:eu-west-1:367815980639:table/demo-ActorCodes -S3 Bucket Name: use-a-lpa-dynamodb-exports-development - COMPLETED s3://use-a-lpa-dynamodb-exports-development/demo-ActorCodes/AWSDynamoDB/01617269934602-162d6355/data/ - +Queries will be run for date range 2023-11-01 to 2023-11-30 +Waiting for DynamoDb export to be complete ( if run with Athena only option, this is just checking the previous export is complete ) +. -DynamoDB Table ARN: arn:aws:dynamodb:eu-west-1:367815980639:table/demo-ActorUsers -S3 Bucket Name: use-a-lpa-dynamodb-exports-development - COMPLETED s3://use-a-lpa-dynamodb-exports-development/demo-ActorUsers/AWSDynamoDB/01617269934827-b65efa47/data/ +DynamoDB export is complete - -DynamoDB Table ARN: arn:aws:dynamodb:eu-west-1:367815980639:table/demo-ViewerCodes -S3 Bucket Name: use-a-lpa-dynamodb-exports-development - COMPLETED s3://use-a-lpa-dynamodb-exports-development/demo-ViewerCodes/AWSDynamoDB/01617269935076-45fe3523/data/ - - -DynamoDB Table ARN: arn:aws:dynamodb:eu-west-1:367815980639:table/demo-ViewerActivity -S3 Bucket Name: use-a-lpa-dynamodb-exports-development - COMPLETED s3://use-a-lpa-dynamodb-exports-development/demo-ViewerActivity/AWSDynamoDB/01617269935310-6968000e/data/ - - -DynamoDB Table ARN: arn:aws:dynamodb:eu-west-1:367815980639:table/demo-UserLpaActorMap -S3 Bucket Name: use-a-lpa-dynamodb-exports-development - COMPLETED s3://use-a-lpa-dynamodb-exports-development/demo-UserLpaActorMap/AWSDynamoDB/01617269935547-9e1e9b04/data/ ``` ## AWS Athena -We can use AWS Athena to create a database and tables of the exported DynamoDB data so that we can use SQL to further explore and query our data. - -### Getting started - -See the getting started guide and follow Step 1: Creating a Database here - -After this you are able to write and run queries. Queries can be used to create tables. - -### Creating tables - -Here are some example SQL statements for creating tables from each DynamoDB Export - -Note: - -- the location for each export can be copied from the out put of the dynamodb_export.p script -- These queries create tables if they don't already exists. If the query is changed to add some new data, either delete and recreate the table or use an UPDATE query. - -Examples: - -Creating the viewer activity Table - -```SQL -CREATE EXTERNAL TABLE IF NOT EXISTS viewer_activity ( - Item struct , - ViewedBy:struct, - Viewed:struct> -) -ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' -WITH SERDEPROPERTIES ( - 'serialization.format' = '1' ) -LOCATION 's3://use-a-lpa-dynamodb-exports-development/demo-ViewerActivity/AWSDynamoDB/01616672353743-e52c5c67/data/' -TBLPROPERTIES ('has_encrypted_data'='true'); -``` - -Creating the viewer codes Table - -```SQL -CREATE EXTERNAL TABLE IF NOT EXISTS viewer_codes ( - Item struct , - Added:struct, - Expires:struct, - Organisation:struct, - SiriusUid:struct, - UserLpaActor:struct> -) -ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' -WITH SERDEPROPERTIES ( - 'serialization.format' = '1' ) -LOCATION 's3://use-a-lpa-dynamodb-exports-development/demo-ViewerCodes/AWSDynamoDB/01616672353584-6ff1f666/data/' -TBLPROPERTIES ('has_encrypted_data'='true'); -``` - -Creating the actor users Table - -```SQL -CREATE EXTERNAL TABLE IF NOT EXISTS actor_users ( - Item struct , - Email:struct, - LastLogin:struct> -) -ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' -WITH SERDEPROPERTIES ( - 'serialization.format' = '1' ) -LOCATION 's3://use-a-lpa-dynamodb-exports-development/demo-ViewerCodes/AWSDynamoDB/01616672353584-6ff1f666/data/' -TBLPROPERTIES ('has_encrypted_data'='true'); -``` - -Creating the user-lpa-actor map Table - -```SQL -CREATE EXTERNAL TABLE IF NOT EXISTS user_lpa_actor_map ( - Item struct , - ActorId:struct, - Added:struct, - SiriusUid:struct, - UserId:struct> -) -ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' -WITH SERDEPROPERTIES ( - 'serialization.format' = '1' ) -LOCATION 's3://use-a-lpa-dynamodb-exports-development/demo-ViewerCodes/AWSDynamoDB/01616672353584-6ff1f666/data/' -TBLPROPERTIES ('has_encrypted_data'='true'); -``` +The idea going foraward is for the dynamdb_export script to provide regularly run Athena queries. For ad-hoc queries that are one-shot or aren't yet automated, we can access Athena via the AWS Console, and run SQL queries against the ual database that this script creates. -### Querying the newly created tables +### Querying the Athena tables -After creating tables, you can run queries. Here is an example Select Query for Athena. +Here is an example Select Query for Athena which can be run in the AWS console. ```SQL -- issues SELECT query diff --git a/query/dynamodb_export.py b/query/dynamodb_export.py index cc87a52068..55ff39220c 100644 --- a/query/dynamodb_export.py +++ b/query/dynamodb_export.py @@ -1,162 +1,365 @@ -import datetime +from datetime import datetime +import calendar import argparse import boto3 +import re +import csv +from time import sleep -class DynamoDBExporter: - aws_dynamodb_client = '' - aws_kms_client = '' - environment_details = '' - export_time = '' +class DynamoDBExporterAndQuerier: + athena_database_name = "ual" + aws_dynamodb_client = "" + aws_kms_client = "" + environment_details = "" + export_time = "" def __init__(self, environment): + self.tables = { + "Stats": None, + "ActorCodes": None, + "ActorUsers": None, + "ViewerCodes": None, + "ViewerActivity": None, + "UserLpaActorMap": None, + } + + self.table_ddl_files = { + "tables/stats.ddl": "Stats", + "tables/actor_codes.ddl": "ActorCodes", + "tables/actor_users.ddl": "ActorUsers", + "tables/viewer_codes.ddl": "ViewerCodes", + "tables/viewer_activity.ddl": "ViewerActivity", + "tables/user_lpa_actor_map.ddl": "UserLpaActorMap", + } + self.environment_details = self.set_environment_details(environment) aws_iam_session = self.set_iam_role_session() - self.aws_dynamodb_client = self.get_aws_client( - 'dynamodb', - aws_iam_session) + self.aws_dynamodb_client = self.get_aws_client("dynamodb", aws_iam_session) - self.aws_kms_client = self.get_aws_client( - 'kms', - aws_iam_session) + self.aws_kms_client = self.get_aws_client("kms", aws_iam_session) + + self.aws_athena_client = self.get_aws_client("athena", aws_iam_session) self.kms_key_id = self.get_kms_key_id( - 'dynamodb-exports-{}'.format( - self.environment_details['account_name']) - ) + "dynamodb-exports-{}".format(self.environment_details["account_name"]) + ) + self.export_bucket_name = "use-a-lpa-dynamodb-exports-{}".format( + self.environment_details["account_name"] + ) + + def set_date_range(self, start, end): + self.start_date = start + self.end_date = end + print( + f"Queries will be run for date range {self.start_date} to {self.end_date}" + ) + + def set_default_date_range(self): + today = datetime.today() + days_in_mo = calendar.monthrange(today.year, today.month) + self.start_date = f"{today.year}-{today.month}-01" + self.end_date = f"{today.year}-{today.month}-{days_in_mo[1]}" + print( + f"Queries will be run for date range {self.start_date} to {self.end_date}" + ) @staticmethod def get_aws_client(client_type, aws_iam_session, region="eu-west-1"): client = boto3.client( client_type, region_name=region, - aws_access_key_id=aws_iam_session['Credentials']['AccessKeyId'], - aws_secret_access_key=aws_iam_session['Credentials']['SecretAccessKey'], - aws_session_token=aws_iam_session['Credentials']['SessionToken']) + aws_access_key_id=aws_iam_session["Credentials"]["AccessKeyId"], + aws_secret_access_key=aws_iam_session["Credentials"]["SecretAccessKey"], + aws_session_token=aws_iam_session["Credentials"]["SessionToken"], + ) return client @staticmethod def set_environment_details(environment): aws_account_ids = { - 'production': "690083044361", - 'preproduction': "888228022356", - 'development': "367815980639", + "production": "690083044361", + "preproduction": "888228022356", + "development": "367815980639", } - aws_account_id = aws_account_ids.get( - environment, "367815980639") + aws_account_id = aws_account_ids.get(environment, "367815980639") if environment in aws_account_ids.keys(): account_name = environment else: - account_name = 'development' + account_name = "development" response = { - 'name': environment.lower(), - 'account_name': account_name.lower(), - 'account_id': aws_account_id, + "name": environment.lower(), + "account_name": account_name.lower(), + "account_id": aws_account_id, } return response def set_iam_role_session(self): - if self.environment_details['name'] == "production": - role_arn = 'arn:aws:iam::{}:role/db-analysis'.format( - self.environment_details['account_id']) + if self.environment_details["name"] == "production": + role_arn = "arn:aws:iam::{}:role/db-analysis".format( + self.environment_details["account_id"] + ) else: - role_arn = 'arn:aws:iam::{}:role/operator'.format( - self.environment_details['account_id']) + role_arn = "arn:aws:iam::{}:role/operator".format( + self.environment_details["account_id"] + ) sts = boto3.client( - 'sts', - region_name='eu-west-1', + "sts", + region_name="eu-west-1", ) session = sts.assume_role( RoleArn=role_arn, - RoleSessionName='exporting_dynamodb_tables_to_s3', - DurationSeconds=900 + RoleSessionName="exporting_dynamodb_tables_to_s3", + DurationSeconds=900, ) return session def get_kms_key_id(self, kms_key_alias): response = self.aws_kms_client.describe_key( - KeyId='alias/{}'.format(kms_key_alias), - ) - return response['KeyMetadata']['KeyId'] - - def export_table_to_point_in_time(self, check_only): - tables = [ - "ActorCodes", - "ActorUsers", - "ViewerCodes", - "ViewerActivity", - "UserLpaActorMap", - ] - - for table in tables: - table_arn = self.get_table_arn('{}-{}'.format( - self.environment_details['name'], - table) - ) - bucket_name = 'use-a-lpa-dynamodb-exports-{}'.format( - self.environment_details['account_name']) - s3_prefix = '{}-{}'.format( - self.environment_details['name'], - table) - print('\n') - print('DynamoDB Table ARN:',table_arn) - print('S3 Bucket Name:', bucket_name) - - if check_only == False: - print("exporting tables") - response = self.aws_dynamodb_client.export_table_to_point_in_time( - TableArn=table_arn, - S3Bucket=bucket_name, - S3BucketOwner=self.environment_details['account_id'], - S3Prefix=s3_prefix, - S3SseAlgorithm='KMS', - S3SseKmsKeyId=self.kms_key_id, - ExportFormat='DYNAMODB_JSON' - ) - - response = self.aws_dynamodb_client.list_exports( + KeyId="alias/{}".format(kms_key_alias), + ) + return response["KeyMetadata"]["KeyId"] + + def check_dynamo_export_status(self): + overallCompleted = False + print( + "Waiting for DynamoDb export to be complete ( if run with Athena only option, this is just checking the previous export is complete )" + ) + while not overallCompleted: + print(".", end="", flush=True) + # assume all tables are completed until we encounter one that is not + tablesCompleted = True + for table in self.tables.keys(): + if not self.get_dynamo_export_status(table): + # we encountered an inconmplete table so they are not all complete + tablesCompleted = False + overallCompleted = tablesCompleted + sleep(10) + print("\n") + print("DynamoDB export is complete") + + def export_all_dynamo_tables(self): + for table in self.tables.keys(): + table_arn = self.get_table_arn( + "{}-{}".format(self.environment_details["name"], table) + ) + s3_prefix = "{}-{}".format(self.environment_details["name"], table) + + self.export_dynamo_table(table_arn, self.export_bucket_name, s3_prefix) + + def export_dynamo_table(self, table_arn, bucket_name, s3_prefix): + print(f"exporting {table_arn} dynamoDb table") + response = self.aws_dynamodb_client.export_table_to_point_in_time( TableArn=table_arn, - MaxResults=1 + S3Bucket=bucket_name, + S3BucketOwner=self.environment_details["account_id"], + S3Prefix=s3_prefix, + S3SseAlgorithm="KMS", + S3SseKmsKeyId=self.kms_key_id, + ExportFormat="DYNAMODB_JSON", + ) + + def get_dynamo_export_status(self, table): + table_arn = self.get_table_arn( + "{}-{}".format(self.environment_details["name"], table) + ) + s3_prefix = "{}-{}".format(self.environment_details["name"], table) + + response = self.aws_dynamodb_client.list_exports( + TableArn=table_arn, MaxResults=1 + ) + completed = True + for export in response["ExportSummaries"]: + export_arn_hash = export["ExportArn"].rsplit("/", 1)[-1] + s3_path = "s3://{}/{}/AWSDynamoDB/{}/data/".format( + self.export_bucket_name, s3_prefix, export_arn_hash ) - for export in response['ExportSummaries']: - export_arn_hash = export['ExportArn'].rsplit('/', 1)[-1] - s3_path = 's3://{}/{}/AWSDynamoDB/{}/data/'.format( - bucket_name, - s3_prefix, - export_arn_hash - ) - print('\t', export['ExportStatus'], s3_path) + if export["ExportStatus"] != "COMPLETED": + completed = False + + self.tables[table] = s3_path + + return completed def get_table_arn(self, table_name): - response = self.aws_dynamodb_client.describe_table( - TableName=table_name + response = self.aws_dynamodb_client.describe_table(TableName=table_name) + + return response["Table"]["TableArn"] + + def drop_athena_database(self): + query = f"DROP DATABASE {self.athena_database_name} CASCADE;" + self.run_athena_query(query, quiet=True) + + def create_athena_database(self): + query = f"CREATE DATABASE IF NOT EXISTS {self.athena_database_name};" + self.run_athena_query(query, quiet=True) + + def create_athena_tables(self): + print("Re-creating Athena database and loading Athena tables") + self.drop_athena_database() + self.create_athena_database() + + for table_ddl in self.table_ddl_files.keys(): + exported_s3_location = self.tables[self.table_ddl_files[table_ddl]] + self.create_athena_table(table_ddl, exported_s3_location) + + def create_athena_table(self, table_ddl, s3_location): + with open(table_ddl) as ddl: + rawQuery = ddl.read() + searchStr = "'s3(.*)'" + query = re.sub(searchStr, f"'{s3_location}'", rawQuery, flags=re.M) + self.run_athena_query(query, quiet=True) + + def run_athena_query(self, query, outputFileName=None, quiet=False): + if quiet != True: + print("\n") + print("Running Athena query : ") + print(query) + + response = self.aws_athena_client.start_query_execution( + QueryString=query, + QueryExecutionContext={"Database": self.athena_database_name}, + ResultConfiguration={"OutputLocation": f"s3://{self.export_bucket_name}/"}, + ) + + query_execution_id = response["QueryExecutionId"] + while True: + finish_state = self.aws_athena_client.get_query_execution( + QueryExecutionId=query_execution_id + )["QueryExecution"]["Status"]["State"] + if finish_state == "RUNNING" or finish_state == "QUEUED": + sleep(10) + else: + break + + assert finish_state == "SUCCEEDED", f"query state is {finish_state}" + + response = self.aws_athena_client.get_query_results( + QueryExecutionId=query_execution_id, MaxResults=500 + ) + + results = response["ResultSet"]["Rows"] + + while "NextToken" in response: + response = self.aws_athena_client.get_query_results( + QueryExecutionId=query_execution_id, MaxResults=500, + NextToken=response["NextToken"]) + results.extend(response["ResultSet"]["Rows"]) + + if outputFileName: + self.output_athena_results(results, outputFileName) + + def output_athena_results(self, results, outputFileName): + with open(f"results/{outputFileName}.csv", "w", newline="") as outFile: + wr = csv.writer(outFile, quoting=csv.QUOTE_ALL) + for row in results: + outputRow = "" + csvRow = [] + for field in row["Data"]: + cell = (list)(field.values()) + outputRow = f"{outputRow} | {cell[0]}" + csvRow.append(cell[0]) + + print(outputRow) + wr.writerow(csvRow) + + def get_expired_viewed_access_codes(self): + sql_string = f'SELECT distinct va.item.viewerCode.s as ViewedCode, va.item.viewedby.s as Organisation FROM "ual"."viewer_activity" as va, "ual"."viewer_codes" as vc WHERE va.item.viewerCode = vc.item.viewerCode AND date_add(\'day\', -30, vc.item.expires.s) BETWEEN date(\'{self.start_date}\') AND date(\'{self.end_date}\') ORDER by Organisation;' + self.run_athena_query( + sql_string, outputFileName="ExpiredViewedAccessCodes" + ) + + def get_expired_unviewed_access_codes(self): + sql_string = f'SELECT vc.item.viewerCode.s as ViewerCode, vc.item.organisation.s as Organisation FROM "ual"."viewer_codes" as vc WHERE vc.item.viewerCode.s not in (SELECT va.item.viewerCode.s FROM "ual"."viewer_activity" as va) AND date_add(\'day\', -30, vc.item.expires.s) BETWEEN date(\'{self.start_date}\') AND date(\'{self.end_date}\') ORDER BY vc.item.viewerCode.s' + self.run_athena_query( + sql_string, outputFileName="ExpiredUnviewedAccessCodes" ) - return response['Table']['TableArn'] + def get_count_of_viewed_access_codes(self): + sql_string = f"SELECT COUNT(*) FROM \"ual\".\"viewer_activity\" WHERE Item.Viewed.S BETWEEN date(\'{self.start_date}\') AND date(\'{self.end_date}\');" + self.run_athena_query( + sql_string, + outputFileName="CountofViewedAccessCodes", + ) + def get_count_of_expired_access_codes(self): + sql_string = f"SELECT COUNT(*) FROM \"viewer_codes\" as vc WHERE date_add('day', -30, vc.item.expires.s) BETWEEN date(\'{self.start_date}\') AND date(\'{self.end_date}\');" + self.run_athena_query( + sql_string, + outputFileName="CountofExpiredAccessCodes", + ) def main(): - parser = argparse.ArgumentParser( - description="Exports DynamoDB tables to S3.") - parser.add_argument("--environment", - default="demo", - help="The environment to export DynamoDB data for") - parser.add_argument('--check_exports', dest='check_only', action='store_const', - const=True, default=False, - help='Output json data instead of plaintext to terminal') + parser = argparse.ArgumentParser(description="Exports DynamoDB tables to S3.") + parser.add_argument( + "--environment", + default="demo", + help="The environment to export DynamoDB data for", + ) + parser.add_argument( + "--check_exports", + dest="check_only", + action="store_const", + const=True, + default=False, + help="Output json data instead of plaintext to terminal", + ) + parser.add_argument( + "--reload_athena_and_query", + dest="reload_athena_and_query_flag", + action="store_const", + const=True, + default=False, + help="Reload Athena and run query, assuming DynamoDb export has already run", + ) + parser.add_argument( + "--athena_query_only", + dest="athena_query_only_flag", + action="store_const", + const=True, + default=False, + help="Only run the Athena query, assuming that DynamoDb export and load into Athena has already run", + ) + parser.add_argument( + "--start_date", default="", help="Start date in the form YYYY-MM-DD" + ) + parser.add_argument( + "--end_date", default="", help="End date in the form YYYY-MM-DD" + ) args = parser.parse_args() - work = DynamoDBExporter( - args.environment) - work.export_table_to_point_in_time(args.check_only) + work = DynamoDBExporterAndQuerier(args.environment) + + if args.start_date and args.end_date: + work.set_date_range(args.start_date, args.end_date) + else: + work.set_default_date_range() + + if args.check_only: + work.check_dynamo_export_status() + return + + # do the DynamoDb export, unless we've specified just Athena load and query, or just athena query + if not args.reload_athena_and_query_flag and not args.athena_query_only_flag: + work.export_all_dynamo_tables() + + # create the Athena tables, unless we've specified query only + if not args.athena_query_only_flag: + work.check_dynamo_export_status() + work.create_athena_tables() + + work.get_expired_viewed_access_codes() + work.get_expired_unviewed_access_codes() + work.get_count_of_viewed_access_codes() + work.get_count_of_expired_access_codes() if __name__ == "__main__": diff --git a/query/results/README b/query/results/README new file mode 100644 index 0000000000..cad1227dd2 --- /dev/null +++ b/query/results/README @@ -0,0 +1 @@ +here to ensure results dir exists diff --git a/query/tables/actor_codes.ddl b/query/tables/actor_codes.ddl new file mode 100644 index 0000000000..2cc6dbf115 --- /dev/null +++ b/query/tables/actor_codes.ddl @@ -0,0 +1,8 @@ +CREATE EXTERNAL TABLE IF NOT EXISTS actor_codes ( + Item struct > +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ( + 'serialization.format' = '1' ) +LOCATION 's3://use-a-lpa-dynamodb-exports-development/demo-ActorCodes/AWSDynamoDB/01616672353584-6ff1f666/data/' +TBLPROPERTIES ('has_encrypted_data'='true'); diff --git a/query/tables/actor_users.ddl b/query/tables/actor_users.ddl new file mode 100644 index 0000000000..5a2ba761c0 --- /dev/null +++ b/query/tables/actor_users.ddl @@ -0,0 +1,10 @@ +CREATE EXTERNAL TABLE IF NOT EXISTS actor_users ( + Item struct , + Email:struct, + LastLogin:struct> +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ( + 'serialization.format' = '1' ) +LOCATION 's3://use-a-lpa-dynamodb-exports-development/demo-ActorUsers/AWSDynamoDB/01699371668200-7b5d140b/data/' +TBLPROPERTIES ('has_encrypted_data'='true'); diff --git a/query/tables/stats.ddl b/query/tables/stats.ddl new file mode 100644 index 0000000000..356952ea1b --- /dev/null +++ b/query/tables/stats.ddl @@ -0,0 +1,24 @@ +CREATE EXTERNAL TABLE IF NOT EXISTS stats ( + Item struct , + AccountActivatedEvent:struct, + AccountCreatedEvent:struct, + AccountDeletedEvent:struct, + AddedLpaTypeHwEvent:struct, + AddedLpaTypePfaEvent:struct, + DownloadSummaryEvent:struct, + FullMatchKeyRequestSuccessLpaTypeHwEvent:struct, + FullMatchKeyRequestSuccessLpaTypePfaEvent:struct, + LpaRemovedEvent:struct, + LpasAdded:struct, + OlderLpaNeedsCleansingEvent:struct, + UserAbroadAddressRequestSuccessEvent:struct, + ViewLpaShareCodeCancelledEvent:struct, + ViewLpaShareCodeExpiredEvent:struct, + ViewerCodesCreated:struct, + ViewerCodesViewed:struct> +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ( + 'serialization.format' = '1' ) +LOCATION 's3://use-a-lpa-dynamodb-exports-development/demo-stats/AWSDynamoDB/01699371668200-7b5d140b/data/' +TBLPROPERTIES ('has_encrypted_data'='true'); diff --git a/query/tables/user_lpa_actor_map.ddl b/query/tables/user_lpa_actor_map.ddl new file mode 100644 index 0000000000..8f26c0228f --- /dev/null +++ b/query/tables/user_lpa_actor_map.ddl @@ -0,0 +1,12 @@ +CREATE EXTERNAL TABLE IF NOT EXISTS user_lpa_actor_map ( + Item struct , + ActorId:struct, + Added:struct, + SiriusUid:struct, + UserId:struct> +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ( + 'serialization.format' = '1' ) +LOCATION 's3://use-a-lpa-dynamodb-exports-development/demo-ViewerCodes/AWSDynamoDB/01616672353584-6ff1f666/data/' +TBLPROPERTIES ('has_encrypted_data'='true'); diff --git a/query/tables/viewer_activity.ddl b/query/tables/viewer_activity.ddl new file mode 100644 index 0000000000..ddfb1c0c4b --- /dev/null +++ b/query/tables/viewer_activity.ddl @@ -0,0 +1,10 @@ +CREATE EXTERNAL TABLE IF NOT EXISTS viewer_activity ( + Item struct , + ViewedBy:struct, + Viewed:struct> +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ( + 'serialization.format' = '1' ) +LOCATION 's3://use-a-lpa-dynamodb-exports-development/demo-ViewerActivity/AWSDynamoDB/01616672353743-e52c5c67/data/' +TBLPROPERTIES ('has_encrypted_data'='true'); diff --git a/query/tables/viewer_codes.ddl b/query/tables/viewer_codes.ddl new file mode 100644 index 0000000000..99411d1c66 --- /dev/null +++ b/query/tables/viewer_codes.ddl @@ -0,0 +1,13 @@ +CREATE EXTERNAL TABLE IF NOT EXISTS viewer_codes ( + Item struct , + Added:struct, + Expires:struct, + Organisation:struct, + SiriusUid:struct, + UserLpaActor:struct> +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ( + 'serialization.format' = '1' ) +LOCATION 's3://use-a-lpa-dynamodb-exports-development/demo-ViewerCodes/AWSDynamoDB/01616672353584-6ff1f666/data/' +TBLPROPERTIES ('has_encrypted_data'='true');