diff --git a/pkg/dataplane/streamconsumergroup/claim.go b/pkg/dataplane/streamconsumergroup/claim.go index 7a3830e..8a8edfd 100644 --- a/pkg/dataplane/streamconsumergroup/claim.go +++ b/pkg/dataplane/streamconsumergroup/claim.go @@ -217,11 +217,13 @@ func (c *claim) fetchRecordBatch(location string) (string, error) { for receivedRecordIndex, receivedRecord := range getRecordsOutput.Records { record := v3io.StreamRecord{ - ShardID: &c.shardID, - Data: receivedRecord.Data, - ClientInfo: receivedRecord.ClientInfo, - PartitionKey: receivedRecord.PartitionKey, - SequenceNumber: receivedRecord.SequenceNumber, + ShardID: &c.shardID, + Data: receivedRecord.Data, + ClientInfo: receivedRecord.ClientInfo, + PartitionKey: receivedRecord.PartitionKey, + SequenceNumber: receivedRecord.SequenceNumber, + ArrivalTimeSec: receivedRecord.ArrivalTimeSec, + ArrivalTimeNSec: receivedRecord.ArrivalTimeNSec, } records[receivedRecordIndex] = record diff --git a/pkg/dataplane/types.go b/pkg/dataplane/types.go index 294e413..97735c7 100755 --- a/pkg/dataplane/types.go +++ b/pkg/dataplane/types.go @@ -326,11 +326,13 @@ type GetItemsOutput struct { // type StreamRecord struct { - ShardID *int - Data []byte - ClientInfo []byte - PartitionKey string - SequenceNumber uint64 + ShardID *int + Data []byte + ClientInfo []byte + PartitionKey string + SequenceNumber uint64 + ArrivalTimeSec int + ArrivalTimeNSec int } type SeekShardInputType int