Skip to content

Commit

Permalink
add run method (#68)
Browse files Browse the repository at this point in the history
* add run method

* update tests / add rollback arg

* simplify call

* raise exception regardless

* PR review - add bulk json insert method

* remove unneeded vars

* lint

* devops context permissioning

* slack / aws orbs

* lint

* lint

* lint

* lint

* module import

* fix test creds

* fix test creds

* fix test creds

Co-authored-by: ncgl-syngenta <[email protected]>
  • Loading branch information
ncgl-syngenta and ncgl-syngenta authored Sep 3, 2021
1 parent cecac1d commit 26b7aff
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 26 deletions.
25 changes: 14 additions & 11 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
version: 2.1
orbs:
aws-cli: circleci/[email protected]
slack: circleci/[email protected]
sonarcloud: sonarsource/[email protected]
commands:
pipenv-install-dev:
Expand All @@ -20,13 +18,23 @@ commands:
- store_artifacts:
path: ./coverage/lint/report.html
python-test:
steps:
- run: sudo rm -f /etc/boto.cfg
- run: pipenv run test
steps:
- run: sudo rm -f /etc/boto.cfg
- run:
command: pipenv run test
environment:
AWS_ACCESS_KEY_ID: 0
AWS_SECRET_ACCESS_KEY: 0
AWS_DEFAULT_REGION: us-east-2
python-report:
steps:
- run: sudo rm -f /etc/boto.cfg
- run: pipenv run coverage
- run:
command: pipenv run coverage
environment:
AWS_ACCESS_KEY_ID: 0
AWS_SECRET_ACCESS_KEY: 0
AWS_DEFAULT_REGION: us-east-2
- store_test_results:
path: ./coverage/reports
- store_artifacts:
Expand Down Expand Up @@ -88,13 +96,11 @@ jobs:
- DATA_DIR=/tmp/localstack/data
steps:
- checkout
- aws-cli/setup
- pipenv-install-dev
- python-lint
- python-test
- python-report
- sonarcloud/scan
- slack-error
install-build-deploy:
docker:
- image: cimg/python:3.8
Expand All @@ -103,13 +109,11 @@ jobs:
- pipenv-install
- pypi-setup
- pypi-deploy
- slack-status
workflows:
install-build-deploy:
jobs:
- install-build-deploy:
context:
- tools-cicd
- pypi-token
- sonarcloud-token
filters:
Expand All @@ -121,7 +125,6 @@ workflows:
jobs:
- install-build-test:
context:
- tools-cicd
- pypi-token
- sonarcloud-token
filters:
Expand Down
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,7 @@ adapter.delete(s3_path='test/test-create.json')
## Contributing
If you would like to contribute please make sure to follow the established patterns and unit test your code:

### Unit Testing
To run unit test, enter command:
```bash
RUN_MODE=unittest python -m unittest discover
```
### Local Unit Testing

- In one tab, run `pipenv run local`
- In a second tab, run `RUN_MODE=unittest python -m unittest discover`
3 changes: 2 additions & 1 deletion syngenta_digital_dta/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ def adapter(**kwargs):
return ElasticsearchAdapter(**kwargs)
if kwargs.get('engine') == 's3':
return S3Adapter(**kwargs)
raise Exception(f'engine {kwargs.get("engine", "was not supplied, an empty engine kwarg is")} not supported; contribute to get it supported :)')
raise Exception(
f'engine {kwargs.get("engine", "was not supplied, an empty engine kwarg is")} not supported; contribute to get it supported :)')
36 changes: 34 additions & 2 deletions syngenta_digital_dta/postgres/adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from typing import OrderedDict

from psycopg2.extensions import AsIs

from syngenta_digital_dta.postgres import json_formatting
from syngenta_digital_dta.common import dict_merger
from syngenta_digital_dta.common import logger
from syngenta_digital_dta.common import publisher
Expand Down Expand Up @@ -99,7 +102,7 @@ def create_table(self, **kwargs):
if not kwargs['query'].lower().startswith('create table'):
self.__raise_error('TABLE_WRITE_ONLY', **kwargs)
query = kwargs.pop('query')
self.__execute(query, params={}, commit=True)
self.__execute(query, params={}, commit=True, rollback=True)

def query(self, **kwargs):
if 'params' not in kwargs:
Expand All @@ -111,6 +114,33 @@ def query(self, **kwargs):
self.__execute(query, params, **kwargs)
return self.__get_data(all=True)

def bulk_insert_json(
self,
json: str,
table_name: str,
column_map: OrderedDict,
json_column_map: OrderedDict,
function_map=None,
):

statement = json_formatting.insert_json_into_table(
json=json,
table_name=table_name,
column_map=column_map,
json_column_map=json_column_map,
function_map=function_map or {}
)

self.__execute(query=statement, params={})

def create_index(self, table_name, index_columns):
index_name = f"index_{'_'.join(index_columns)}"
index_cols = f"({', '.join(index_columns)})"

statement = f"CREATE INDEX {index_name} ON {table_name} {index_cols}"

self.__execute(query=statement, params={})

def __create_join_query(self, relationship, **kwargs):
params = {
'table': AsIs(self.table),
Expand Down Expand Up @@ -223,7 +253,9 @@ def __execute(self, query, params, **kwargs):
except Exception as error:
self.__debug(query, params, True)
logger.log(level='ERROR', log={'error': error})
raise Exception('error with execution, check logs') from error
if kwargs.get('rollback'):
self.connection.rollback()
raise Exception(f'error with execution, check logs - {error}') from error

def __compose_params(self, data, columns="*", values=None):
if not values:
Expand Down
76 changes: 76 additions & 0 deletions syngenta_digital_dta/postgres/json_formatting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
def insert_json_into_table(
json,
table_name,
column_map,
json_column_map,
function_map=None,
):
[target_key] = {v.split(".")[0] for v in json_column_map.values()}

return (
f"""{_build_json_cte(json)}
{_build_insert_statement(table_name, column_map, json_column_map)}
{_build_select_statement(column_map, json_column_map, function_map or {})}
FROM ({_build_json_array_subquery(target_key)}
"""
)


def _build_json_cte(json):
return f"WITH _json_cte AS (SELECT '{json}'::json AS _json)"


def _build_insert_statement(table, column_map, json_column_map):
return f"INSERT INTO {table} ({', '.join(_get_column_order(column_map, json_column_map))})"


def _build_json_array_subquery(target_key):
return f"SELECT json_array_elements(_json->'{target_key}') AS _jsondict FROM _json_cte"


def _build_select_statement(column_map, json_column_map, function_map):
lines = []

for alias, definition in column_map.items():
if alias in function_map:
result = _apply_function(alias, definition, function_map)
result = f"{result} AS {alias}"

else:
result = f"{definition} AS {alias}"

lines.append(result)

for alias, definition in json_column_map.items():
result = _parse_json_line(alias, definition)

if alias in function_map:
result = _apply_function(alias, result, function_map)

lines.append(result)

return f"SELECT {', '.join(lines)}"


def _get_column_order(column_map, json_column_map):
return [
*column_map.keys(),
*json_column_map.keys()
]


def _parse_json_line(k, v):
parts = v.split('.')

if len(parts) == 2:
statement = f"_jsondict -> '{parts[1]}' AS {k}"

elif len(parts) == 3:
statement = f"_jsondict -> '{parts[1]}' ->> '{parts[2]}' AS {k}"

return statement


def _apply_function(k, statement, function_map):
statement, alias = statement.split(' AS ')
return f"{function_map[k].format(statement)} as {alias}"
Loading

0 comments on commit 26b7aff

Please sign in to comment.