diff --git a/utils/kinesis_reader.py b/utils/kinesis_reader.py index c929fa8..c27454e 100755 --- a/utils/kinesis_reader.py +++ b/utils/kinesis_reader.py @@ -59,6 +59,7 @@ import boto3 import calendar +import gzip import json import time import sys @@ -112,10 +113,13 @@ def retrieve_records(iterators): for shard_id, itx in iterators.items(): resp = client.get_records(ShardIterator=itx) for rec in resp['Records']: + data = rec['Data'] + if data.startswith(b'\x1f\x8b'): + data = gzip.decompress(data) result.append({ 'SequenceNumber': rec['SequenceNumber'], 'ApproximateArrivalTimestamp': rec['ApproximateArrivalTimestamp'].astimezone(timezone.utc).isoformat(), - 'Data': rec['Data'].decode('utf-8'), + 'Data': data.decode('utf-8'), 'PartitionKey': rec['PartitionKey'] }) iterators[shard_id] = resp['NextShardIterator'] @@ -138,5 +142,3 @@ def retrieve_records(iterators): for rec in retrieve_records(iterators): print(json.dumps(rec)) time.sleep(poll_interval) - -