Skip to content

Commit

Permalink
add initial-response support
Browse files Browse the repository at this point in the history
  • Loading branch information
dayaffe committed Sep 13, 2023
1 parent 93a2e5b commit 9d850f3
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class KinesisTests: XCTestCase {
let input = SubscribeToShardInput(consumerARN: consumerARN, shardId: shard?.shardId, startingPosition: KinesisClientTypes.StartingPosition(sequenceNumber: shard?.sequenceNumberRange?.startingSequenceNumber, type: .atSequenceNumber))
let output = try await client.subscribeToShard(input: input)

if let initialResponse = output.dayaffe?.value {
assert(initialResponse.isEmpty)
}

// Monitor the shard event stream
for try await event in output.eventStream! {
switch event {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ extension AWSEventStream {
private var decoder: EventStreamMessageDecoder?
private var messageBuffer: [EventStream.Message] = []
private var error: Error?
private var initialMessage: Data = Data()
private var onInitialResponseReceived: ((Data?) -> Void)?
private var didProcessInitialMessage = false

private var decodedPayload = Data()
private var decodededHeaders: [EventStreamHeader] = []
Expand Down Expand Up @@ -44,8 +47,18 @@ extension AWSEventStream {
self.logger.debug("onComplete")
let message = EventStream.Message(headers: self.decodededHeaders.toHeaders(),
payload: self.decodedPayload)
self.messageBuffer.append(message)
if (message.headers.contains(EventStream.Header(name: ":event-type", value: .string("initial-response")))) {

Check warning on line 50 in Sources/Core/AWSClientRuntime/EventStream/AWSMessageDecoder.swift

View workflow job for this annotation

GitHub Actions / swiftlint

`if`, `for`, `guard`, `switch`, `while`, and `catch` statements shouldn't unnecessarily wrap their conditionals or arguments in parentheses (control_statement)

Check warning on line 50 in Sources/Core/AWSClientRuntime/EventStream/AWSMessageDecoder.swift

View workflow job for this annotation

GitHub Actions / swiftlint

Line should be 120 characters or less; currently it has 128 characters (line_length)
self.initialMessage = message.payload
self.onInitialResponseReceived?(self.initialMessage)
self.didProcessInitialMessage = true
} else {
self.messageBuffer.append(message)

if !self.didProcessInitialMessage {
self.onInitialResponseReceived?(nil) // Signal that initial-response will never come.
self.didProcessInitialMessage = true
}
}
// This could be end of the stream, hence reset the state
self.decodedPayload = Data()
self.decodededHeaders = []
Expand Down Expand Up @@ -88,6 +101,22 @@ extension AWSEventStream {
return message
}

public func awaitInitialResponse() async -> Data? {
return await withCheckedContinuation { continuation in
retrieveInitialResponse { data in
continuation.resume(returning: data)
}
}
}

public func retrieveInitialResponse(completion: @escaping (Data?) -> Void) {
if self.didProcessInitialMessage {
completion(initialMessage) // Could be nil or populated.
} else {
self.onInitialResponseReceived = completion
}
}

/// Throws an error if one has occurred.
/// This should be called before any other methods to make sure
/// that the decoder is in a valid state.
Expand Down

0 comments on commit 9d850f3

Please sign in to comment.