Skip to content

Commit

Permalink
utils/kinesis_reader: recognize and uncompress gzipped records
Browse files Browse the repository at this point in the history
  • Loading branch information
kdgregory committed Aug 28, 2021
1 parent f47eb7e commit f869c8f
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions utils/kinesis_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import boto3
import calendar
import gzip
import json
import time
import sys
Expand Down Expand Up @@ -112,10 +113,13 @@ def retrieve_records(iterators):
for shard_id, itx in iterators.items():
resp = client.get_records(ShardIterator=itx)
for rec in resp['Records']:
data = rec['Data']
if data.startswith(b'\x1f\x8b'):
data = gzip.decompress(data)
result.append({
'SequenceNumber': rec['SequenceNumber'],
'ApproximateArrivalTimestamp': rec['ApproximateArrivalTimestamp'].astimezone(timezone.utc).isoformat(),
'Data': rec['Data'].decode('utf-8'),
'Data': data.decode('utf-8'),
'PartitionKey': rec['PartitionKey']
})
iterators[shard_id] = resp['NextShardIterator']
Expand All @@ -138,5 +142,3 @@ def retrieve_records(iterators):
for rec in retrieve_records(iterators):
print(json.dumps(rec))
time.sleep(poll_interval)


0 comments on commit f869c8f

Please sign in to comment.