Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add initial-response support #1125

Closed
wants to merge 9 commits into from
Closed

Conversation

dayaffe
Copy link
Contributor

@dayaffe dayaffe commented Sep 13, 2023

Add support for parsing initial-response and making it available to users prior to parsing the Event Stream.
See related: https://smithy.io/2.0/spec/streaming.html#initial-messages

Issue #

#976

Description of changes

  • Services should send initial-response as the first message in response. AWSMessageDecoder will detect if the header is of :event-type = initial-response and if so set private variable self.initialResponse = message.payload to be parsed based on service-specific expected initial-response fields (codegen).
  • awaitInitialResponse converts retrieveInitialResponse function into an async function. By using this callback method we're able to wait for self.initialMessage to be populated and then return it asynchronously. This method only waits until the first message is processed as if an initial-response is present it must be the very first message.
  • retrieveInitialResponse immediately calls the completion handler with the message if initialMessage is already processed, otherwise it saves the completion handler to be called later when the message is ready
  • Update unit tests to include initial-response message decoding, encoding, and awaitInitialResponse

Ex. initial-response Usage

// Create the subscription stream
let input = SubscribeToShardInput(consumerARN: consumerARN, shardId: shard?.shardId, startingPosition: KinesisClientTypes.StartingPosition(sequenceNumber: shard?.sequenceNumberRange?.startingSequenceNumber, type: .atSequenceNumber))
let output = try await client.subscribeToShard(input: input)

// initial-response: If initialResponseMember is a non-streaming trait defined in the Kinesis model
guard let initialResponse = output.initialResponseMember?.value else {
    throw MyError.initialResponseNil
}

// Monitor the shard event stream
for try await event in output.eventStream! {
    switch event {
    case .subscribetoshardevent(let event):
        event.records?.forEach { record in
            let recordString = String(data: record.data ?? Data(), encoding: .utf8)
            recordStrings.removeAll { recordString == $0 }
        }
    case .sdkUnknown(let message):
        print("Unknown event: \(message)")
    }

    // Once all the events have been received, stop streaming
    if recordStrings.isEmpty { break }
}

Note: Will revert .package(url: smithySwiftURL, .branch("day/initial-messages")) prior to merge. I only added this to ensure CI passes.

New/existing dependencies impact assessment, if applicable

Conventional Commits

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@dayaffe dayaffe changed the title feat: add initial-response support feat: add initial-messages support Sep 14, 2023
@dayaffe dayaffe changed the title feat: add initial-messages support feat: add initial-response support Sep 20, 2023
@dayaffe
Copy link
Contributor Author

dayaffe commented Sep 20, 2023

/test

@dayaffe
Copy link
Contributor Author

dayaffe commented Sep 20, 2023

/test

1 similar comment
@dayaffe
Copy link
Contributor Author

dayaffe commented Sep 21, 2023

/test

@dayaffe
Copy link
Contributor Author

dayaffe commented Oct 11, 2023

closing in favor of #1165

@dayaffe dayaffe closed this Oct 11, 2023
@dayaffe dayaffe deleted the day/initial-messages branch November 19, 2024 18:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant