Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ddb pagination #147

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# Global rule:
* @paulcruse-syn
* @ncgl-syngenta
40 changes: 35 additions & 5 deletions syngenta_digital_dta/dynamodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import boto3
from boto3.dynamodb.conditions import Attr

import typing
from syngenta_digital_dta.common import schema_mapper
from syngenta_digital_dta.common import dict_merger
from syngenta_digital_dta.common.base_adapter import BaseAdapter
Expand All @@ -21,6 +21,7 @@ def __init__(self, **kwargs):
self.model_schema = kwargs['model_schema']
self.model_identifier = kwargs['model_identifier']
self.model_version_key = kwargs['model_version_key']
self.default_limit = kwargs.get("limit", 100)

@lru_cache(maxsize=128)
def _get_dynamo_table(self, table, endpoint=None):
Expand All @@ -38,18 +39,47 @@ def read(self, **kwargs):
return self.scan(**kwargs)
return self.get(**kwargs)

def paginate(self, func: typing.Callable, query: typing.Dict) -> typing.List[typing.Dict]:
if 'Limit' not in query:
query['Limit'] = self.default_limit

data = []
response = func(**query)
data.append(response)

query['Limit'] -= len(response['Items'])

while 'LastEvaluatedKey' in response and query['Limit'] != 0:
query['ExclusiveStartKey'] = response['LastEvaluatedKey']
response = func(**query)
data.append(response)
query['Limit'] -= len(response['Items'])

return data

@staticmethod
def __flatten_items(raw_results):
results = []
for result in raw_results:
results.extend(result.get('Items', []))
return results

def scan(self, **kwargs):
raw_results = self.paginate(func=self.table.scan, query=kwargs.get('query', {}))
if kwargs.get('raw_scan'):
return self.table.scan(**kwargs.get('query', {}))
return self.table.scan(**kwargs.get('query', {})).get('Items', [])
return raw_results
else:
return self.__flatten_items(raw_results)

def get(self, **kwargs):
return self.table.get_item(**kwargs.get('query', {})).get('Item', {})

def query(self, **kwargs):
raw_results = self.paginate(func=self.table.query, query=kwargs.get('query', {}))
if kwargs.get('raw_query'):
return self.table.query(**kwargs.get('query', {}))
return self.table.query(**kwargs.get('query', {})).get('Items', [])
return raw_results
else:
return self.__flatten_items(raw_results)

def overwrite(self, **kwargs):
overwrite_item = schema_mapper.map_to_schema(kwargs['data'], self.model_schema_file, self.model_schema)
Expand Down
5 changes: 5 additions & 0 deletions tests/syngenta_digital_dta/dynamodb/mock_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ def __init__(self, **kwargs):
'modified': '2020-10-05'
}

self.mock_pagination_data = [
{'Items': [self.mock_data], 'LastEvaluatedKey': {'somekey': 'somevalue'}},
{'Items': [self.mock_data]}
]

def setup_test_table(self):
self.clear_table()
self.create_table()
Expand Down
123 changes: 116 additions & 7 deletions tests/syngenta_digital_dta/dynamodb/test_adapter.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import unittest
import warnings

import boto3
from unittest import mock

import syngenta_digital_dta
from tests.syngenta_digital_dta.dynamodb.mock_table import MockTable
from syngenta_digital_dta.dynamodb.adapter import BatchItemException
from tests.syngenta_digital_dta.dynamodb.mock_table import MockTable


class DynamoDBAdapterTest(unittest.TestCase):
Expand All @@ -23,7 +22,8 @@ def setUp(self, *args, **keywargs):
model_schema='test-dynamo-model',
model_schema_file='tests/openapi.yml',
model_identifier='test_id',
model_version_key='modified'
model_version_key='modified',
limit=200
)

def test_init(self):
Expand Down Expand Up @@ -96,7 +96,7 @@ def test_adapter_raw_query(self):
}
)

passed = data['Items'][0] == self.mock_table.mock_data and data.get('LastEvaluatedKey')
passed = data[0]['Items'][0] == self.mock_table.mock_data and data[0].get('LastEvaluatedKey')
self.assertTrue(passed)

def test_adapter_scan(self):
Expand All @@ -105,8 +105,7 @@ def test_adapter_scan(self):

def test_adapter_raw_scan(self):
data = self.adapter.scan(**{'raw_scan': True})
print(data)
self.assertDictEqual(data['Items'][0], self.mock_table.mock_data)
self.assertDictEqual(data[0]['Items'][0], self.mock_table.mock_data)

def test_adapter_create(self):
new_data = {
Expand Down Expand Up @@ -271,3 +270,113 @@ def test_adapter_delete(self):
}
)
self.assertDictEqual(deleted_data, {})

def test_query_pagination(self):
self.adapter.table = mock.MagicMock()
self.adapter.table.query.side_effect = self.mock_table.mock_pagination_data

data = self.adapter.query()

self.adapter.table.query.assert_has_calls(
calls=[
mock.call(Limit=200),
mock.call(Limit=199, ExclusiveStartKey={'somekey': 'somevalue'})
]
)

self.assertListEqual(
[
{
'array_number': [1, 2, 3],
'array_objects': [
{'array_number_key': 1, 'array_string_key': 'a'}
],
'created': '2020-10-05',
'modified': '2020-10-05',
'object_key': {'string_key': 'nothing'},
'test_id': 'abc123',
'test_query_id': 'def345'},
{
'array_number': [1, 2, 3],
'array_objects': [
{'array_number_key': 1, 'array_string_key': 'a'}
],
'created': '2020-10-05',
'modified': '2020-10-05',
'object_key': {'string_key': 'nothing'},
'test_id': 'abc123',
'test_query_id': 'def345'
}
],
data
)

def test_raw_query_pagination(self):
self.adapter.table = mock.MagicMock()
self.adapter.table.query.side_effect = self.mock_table.mock_pagination_data

data = self.adapter.query(raw_query=True)

self.adapter.table.query.assert_has_calls(
calls=[
mock.call(Limit=200),
mock.call(Limit=199, ExclusiveStartKey={'somekey': 'somevalue'})
]
)

self.assertListEqual(self.mock_table.mock_pagination_data, data)

def test_scan_pagination(self):
self.adapter.table = mock.MagicMock()
self.adapter.table.scan.side_effect = self.mock_table.mock_pagination_data

data = self.adapter.scan()

self.adapter.table.scan.assert_has_calls(
calls=[
mock.call(Limit=200),
mock.call(Limit=199, ExclusiveStartKey={'somekey': 'somevalue'})
]
)

self.assertListEqual(
[
{
'array_number': [1, 2, 3],
'array_objects': [
{'array_number_key': 1, 'array_string_key': 'a'}
],
'created': '2020-10-05',
'modified': '2020-10-05',
'object_key': {'string_key': 'nothing'},
'test_id': 'abc123',
'test_query_id': 'def345'},
{
'array_number': [1, 2, 3],
'array_objects': [
{'array_number_key': 1, 'array_string_key': 'a'}
],
'created': '2020-10-05',
'modified': '2020-10-05',
'object_key': {'string_key': 'nothing'},
'test_id': 'abc123',
'test_query_id': 'def345'
}
],
data
)

def test_raw_scan_pagination(self):
self.adapter.table = mock.MagicMock()
self.adapter.table.scan.side_effect = self.mock_table.mock_pagination_data

data = self.adapter.scan(raw_scan=True)

self.adapter.table.scan.assert_has_calls(
calls=[
mock.call(Limit=200),
mock.call(Limit=199, ExclusiveStartKey={'somekey': 'somevalue'})
]
)

self.assertListEqual(self.mock_table.mock_pagination_data, data)