From e768c1e4c2be5320a4292fb8dd41bd0515fc4334 Mon Sep 17 00:00:00 2001 From: Rick Newton-Rogers Date: Wed, 13 Nov 2024 09:28:48 +0000 Subject: [PATCH] Migrate to GitHub Actions (#175) Migrate CI to use GitHub Actions. ### Motivation: To migrate to GitHub actions and centralised infrastructure. ### Modifications: Changes of note: * Bump the minimum version to Swift 5.9 in line with CI coverage * Adopt swift-format using rules from SwiftNIO * Remove scripts which are no longer needed ### Result: GitHub Actions CI Future improvements: * Re-enable `--warnings-as-errors` * Investigate extending re-used workflows to enable specifying setup commands and services to allow us to drop the bespoke workflows here. * Enable API breakage checks * Set thresholds and enable Benchmarks * Enable documentation checking --- .editorconfig | 8 + .flake8 | 15 ++ .github/release.yml | 14 ++ .github/workflows/main.yml | 18 ++ .github/workflows/pull_request.yml | 39 ++++ .github/workflows/pull_request_label.yml | 18 ++ .github/workflows/unit_tests.yml | 105 ++++++++++ .licenseignore | 43 ++++ .spi.yml | 4 + .swift-format | 68 +++++++ .swiftformat | 25 --- .unacceptablelanguageignore | 2 + .yamllint.yml | 12 ++ .../KafkaConsumerBenchmark.swift | 33 +++- .../Utilities.swift | 13 +- Benchmarks/Package.swift | 6 +- CONTRIBUTING.md | 4 + CONTRIBUTORS.txt | 1 + Package.swift | 8 +- .../KafkaConfiguration+Metrics.swift | 41 ++-- .../KafkaConfiguration+Security.swift | 54 +++--- .../KafkaConsumerConfiguration.swift | 13 +- .../KafkaProducerConfiguration.swift | 11 +- .../KafkaTopicConfiguration.swift | 8 +- .../Data/String+KafkaContiguousBytes.swift | 5 +- .../ForTesting/RDKafkaClient+Topic.swift | 29 +-- Sources/Kafka/ForTesting/TestMessages.swift | 12 +- Sources/Kafka/KafkaConsumer.swift | 22 ++- Sources/Kafka/KafkaError.swift | 94 ++++++--- Sources/Kafka/KafkaOffset.swift | 2 +- Sources/Kafka/KafkaProducer.swift | 16 +- Sources/Kafka/KafkaProducerEvent.swift | 2 +- Sources/Kafka/KafkaProducerMessageID.swift | 2 +- Sources/Kafka/RDKafka/RDKafkaClient.swift | 63 +++--- Sources/Kafka/RDKafka/RDKafkaConfig.swift | 2 +- .../Kafka/RDKafka/RDKafkaTopicConfig.swift | 2 +- .../RDKafka/RDKafkaTopicPartitionList.swift | 14 +- .../Kafka/Utilities/Duration+Helpers.swift | 2 +- .../Data+KafkaContiguousBytes.swift | 3 +- Tests/IntegrationTests/KafkaTests.swift | 111 ++++++++--- Tests/IntegrationTests/Utilities.swift | 3 +- Tests/KafkaTests/KafkaConsumerTests.swift | 17 +- Tests/KafkaTests/KafkaProducerTests.swift | 41 ++-- Tests/KafkaTests/Utilities.swift | 6 +- dev/test-benchmark-thresholds.sh | 2 +- dev/update-benchmark-thresholds.sh | 2 +- docker/Dockerfile | 2 +- scripts/generate_contributors_list.sh | 39 ---- scripts/soundness.sh | 183 ------------------ 49 files changed, 754 insertions(+), 485 deletions(-) create mode 100644 .editorconfig create mode 100644 .flake8 create mode 100644 .github/release.yml create mode 100644 .github/workflows/main.yml create mode 100644 .github/workflows/pull_request.yml create mode 100644 .github/workflows/pull_request_label.yml create mode 100644 .github/workflows/unit_tests.yml create mode 100644 .licenseignore create mode 100644 .spi.yml create mode 100644 .swift-format delete mode 100644 .swiftformat create mode 100644 .unacceptablelanguageignore create mode 100644 .yamllint.yml delete mode 100755 scripts/generate_contributors_list.sh delete mode 100755 scripts/soundness.sh diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..08891d83 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,8 @@ +root = true + +[*] +indent_style = space +indent_size = 4 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true \ No newline at end of file diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..4e052d7d --- /dev/null +++ b/.flake8 @@ -0,0 +1,15 @@ +[flake8] + +ignore = + # These are needed to make our license headers pass the linting + E265, + E266, + +# 10% larger than the standard 80 character limit. Conforms to the black +# standard and Bugbear's B950. +max-line-length = 88 + +# Custom rules: +exclude = + Sources/Crdkafka/ + Sources/COpenSSL/ diff --git a/.github/release.yml b/.github/release.yml new file mode 100644 index 00000000..f96b5149 --- /dev/null +++ b/.github/release.yml @@ -0,0 +1,14 @@ +changelog: + categories: + - title: SemVer Major + labels: + - ⚠️ semver/major + - title: SemVer Minor + labels: + - semver/minor + - title: SemVer Patch + labels: + - semver/patch + - title: Other Changes + labels: + - semver/none diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 00000000..b2f403c3 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,18 @@ +name: Main + +on: + push: + branches: [main] + schedule: + - cron: "0 8,20 * * *" + +jobs: + unit-tests: + name: Unit tests + uses: ./.github/workflows/unit_tests.yml + with: + linux_5_9_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -strict-concurrency=complete" + linux_5_10_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -strict-concurrency=complete" + linux_6_0_arguments_override: "--explicit-target-dependency-import-check error" + linux_nightly_6_0_arguments_override: "--explicit-target-dependency-import-check error" + linux_nightly_main_arguments_override: "--explicit-target-dependency-import-check error" diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml new file mode 100644 index 00000000..2c6a87a4 --- /dev/null +++ b/.github/workflows/pull_request.yml @@ -0,0 +1,39 @@ +name: PR + +on: + pull_request: + types: [opened, reopened, synchronize] + +jobs: + soundness: + name: Soundness + uses: swiftlang/github-workflows/.github/workflows/soundness.yml@main + with: + license_header_check_project_name: "swift-kafka-client" + api_breakage_check_enabled: false # requires libsasl2-dev + docs_check_enabled: false # requires libsasl2-dev + + unit-tests: + name: Unit tests + uses: ./.github/workflows/unit_tests.yml + with: + linux_5_9_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -strict-concurrency=complete" + linux_5_10_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -strict-concurrency=complete" + linux_6_0_arguments_override: "--explicit-target-dependency-import-check error" + linux_nightly_6_0_arguments_override: "--explicit-target-dependency-import-check error" + linux_nightly_main_arguments_override: "--explicit-target-dependency-import-check error" + + cxx-interop: + name: Cxx interop + uses: apple/swift-nio/.github/workflows/swift_matrix.yml@main + with: + name: "Cxx interop" + matrix_linux_command: "apt-get update -y -q && apt-get install -y -q jq && apt-get -y install libsasl2-dev && curl -s https://raw.githubusercontent.com/apple/swift-nio/main/scripts/check-cxx-interop-compatibility.sh | bash" + matrix_linux_5_9_enabled: true + matrix_linux_5_10_enabled: true + matrix_linux_6_0_enabled: true + matrix_linux_nightly_6_0_enabled: true + matrix_linux_nightly_main_enabled: true + matrix_windows_6_0_enabled: false + matrix_windows_nightly_6_0_enabled: false + matrix_windows_nightly_main_enabled: false diff --git a/.github/workflows/pull_request_label.yml b/.github/workflows/pull_request_label.yml new file mode 100644 index 00000000..8fd47c13 --- /dev/null +++ b/.github/workflows/pull_request_label.yml @@ -0,0 +1,18 @@ +name: PR label + +on: + pull_request: + types: [labeled, unlabeled, opened, reopened, synchronize] + +jobs: + semver-label-check: + name: Semantic version label check + runs-on: ubuntu-latest + timeout-minutes: 1 + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + persist-credentials: false + - name: Check for Semantic Version label + uses: apple/swift-nio/.github/actions/pull_request_semver_label_checker@main diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml new file mode 100644 index 00000000..3300a33b --- /dev/null +++ b/.github/workflows/unit_tests.yml @@ -0,0 +1,105 @@ +name: Unit tests + +on: + workflow_call: + inputs: + linux_5_9_enabled: + type: boolean + description: "Boolean to enable the Linux 5.9 Swift version matrix job. Defaults to true." + default: true + linux_5_9_arguments_override: + type: string + description: "The arguments passed to swift test in the Linux 5.9 Swift version matrix job." + default: "" + linux_5_10_enabled: + type: boolean + description: "Boolean to enable the Linux 5.10 Swift version matrix job. Defaults to true." + default: true + linux_5_10_arguments_override: + type: string + description: "The arguments passed to swift test in the Linux 5.10 Swift version matrix job." + default: "" + linux_6_0_enabled: + type: boolean + description: "Boolean to enable the Linux 6.0 Swift version matrix job. Defaults to true." + default: true + linux_6_0_arguments_override: + type: string + description: "The arguments passed to swift test in the Linux 6.0 Swift version matrix job." + default: "" + linux_nightly_6_0_enabled: + type: boolean + description: "Boolean to enable the Linux nightly 6.0 Swift version matrix job. Defaults to true." + default: true + linux_nightly_6_0_arguments_override: + type: string + description: "The arguments passed to swift test in the Linux nightly 6.0 Swift version matrix job." + default: "" + linux_nightly_main_enabled: + type: boolean + description: "Boolean to enable the Linux nightly main Swift version matrix job. Defaults to true." + default: true + linux_nightly_main_arguments_override: + type: string + description: "The arguments passed to swift test in the Linux nightly main Swift version matrix job." + default: "" + +jobs: + unit-tests: + name: Unit tests + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + # We are specifying only the major and minor of the docker images to automatically pick up the latest patch release + swift: + - image: "swift:5.9-jammy" + swift_version: "5.9" + enabled: ${{ inputs.linux_5_9_enabled }} + - image: "swift:5.10-jammy" + swift_version: "5.10" + enabled: ${{ inputs.linux_5_10_enabled }} + - image: "swift:6.0-jammy" + swift_version: "6.0" + enabled: ${{ inputs.linux_6_0_enabled }} + - image: "swiftlang/swift:nightly-6.0-jammy" + swift_version: "nightly-6.0" + enabled: ${{ inputs.linux_nightly_6_0_enabled }} + - image: "swiftlang/swift:nightly-main-jammy" + swift_version: "nightly-main" + enabled: ${{ inputs.linux_nightly_main_enabled }} + steps: + - name: Checkout repository + if: ${{ matrix.swift.enabled }} + uses: actions/checkout@v4 + with: + persist-credentials: false + submodules: true + - name: Mark the workspace as safe + if: ${{ matrix.swift.enabled }} + # https://github.com/actions/checkout/issues/766 + run: git config --global --add safe.directory ${GITHUB_WORKSPACE} + - name: Run matrix job + if: ${{ matrix.swift.enabled }} + env: + SWIFT_VERSION: ${{ matrix.swift.swift_version }} + COMMAND: "swift test" + COMMAND_OVERRIDE_5_9: "swift test ${{ inputs.linux_5_9_arguments_override }}" + COMMAND_OVERRIDE_5_10: "swift test ${{ inputs.linux_5_10_arguments_override }}" + COMMAND_OVERRIDE_6_0: "swift test ${{ inputs.linux_6_0_arguments_override }}" + COMMAND_OVERRIDE_NIGHTLY_6_0: "swift test ${{ inputs.linux_nightly_6_0_arguments_override }}" + COMMAND_OVERRIDE_NIGHTLY_MAIN: "swift test ${{ inputs.linux_nightly_main_arguments_override }}" + run: | + apt-get -qq update && apt-get -qq -y install curl && apt-get -y install libsasl2-dev + curl -s https://raw.githubusercontent.com/apple/swift-nio/main/scripts/check-matrix-job.sh | bash + container: + image: ${{ matrix.swift.image }} + services: + zookeeper: + image: ubuntu/zookeeper + kafka: + image: ubuntu/kafka + env: + ZOOKEEPER_HOST: zookeeper + env: + KAFKA_HOST: kafka diff --git a/.licenseignore b/.licenseignore new file mode 100644 index 00000000..e368a0d1 --- /dev/null +++ b/.licenseignore @@ -0,0 +1,43 @@ +.dockerignore +.gitignore +**/.gitignore +.licenseignore +.gitattributes +.gitmodules +.git-blame-ignore-revs +.mailfilter +.mailmap +.spi.yml +.swift-format +.swiftformatignore +.editorconfig +.yamlignore +.github/* +.yamllint.yml +.flake8 +*.md +*.txt +*.yml +*.yaml +*.json +Package.swift +**/Package.swift +Package@-*.swift +**/Package@-*.swift +Package.resolved +**/Package.resolved +Makefile +*.modulemap +**/*.modulemap +**/*.docc/* +*.xcprivacy +**/*.xcprivacy +*.symlink +**/*.symlink +Dockerfile +**/Dockerfile +Snippets/* +dev/git.commit.template +.unacceptablelanguageignore +Sources/Crdkafka/* +Sources/COpenSSL/* diff --git a/.spi.yml b/.spi.yml new file mode 100644 index 00000000..1d5f396c --- /dev/null +++ b/.spi.yml @@ -0,0 +1,4 @@ +version: 1 +builder: + configs: + - documentation_targets: [Kafka, KafkaFoundationCompat] diff --git a/.swift-format b/.swift-format new file mode 100644 index 00000000..7e8ae739 --- /dev/null +++ b/.swift-format @@ -0,0 +1,68 @@ +{ + "version" : 1, + "indentation" : { + "spaces" : 4 + }, + "tabWidth" : 4, + "fileScopedDeclarationPrivacy" : { + "accessLevel" : "private" + }, + "spacesAroundRangeFormationOperators" : false, + "indentConditionalCompilationBlocks" : false, + "indentSwitchCaseLabels" : false, + "lineBreakAroundMultilineExpressionChainComponents" : false, + "lineBreakBeforeControlFlowKeywords" : false, + "lineBreakBeforeEachArgument" : true, + "lineBreakBeforeEachGenericRequirement" : true, + "lineLength" : 120, + "maximumBlankLines" : 1, + "respectsExistingLineBreaks" : true, + "prioritizeKeepingFunctionOutputTogether" : true, + "noAssignmentInExpressions" : { + "allowedFunctions" : [ + "XCTAssertNoThrow", + "XCTAssertThrowsError" + ] + }, + "rules" : { + "AllPublicDeclarationsHaveDocumentation" : false, + "AlwaysUseLiteralForEmptyCollectionInit" : false, + "AlwaysUseLowerCamelCase" : false, + "AmbiguousTrailingClosureOverload" : true, + "BeginDocumentationCommentWithOneLineSummary" : false, + "DoNotUseSemicolons" : true, + "DontRepeatTypeInStaticProperties" : true, + "FileScopedDeclarationPrivacy" : true, + "FullyIndirectEnum" : true, + "GroupNumericLiterals" : true, + "IdentifiersMustBeASCII" : true, + "NeverForceUnwrap" : false, + "NeverUseForceTry" : false, + "NeverUseImplicitlyUnwrappedOptionals" : false, + "NoAccessLevelOnExtensionDeclaration" : true, + "NoAssignmentInExpressions" : true, + "NoBlockComments" : true, + "NoCasesWithOnlyFallthrough" : true, + "NoEmptyTrailingClosureParentheses" : true, + "NoLabelsInCasePatterns" : true, + "NoLeadingUnderscores" : false, + "NoParensAroundConditions" : true, + "NoVoidReturnOnFunctionSignature" : true, + "OmitExplicitReturns" : true, + "OneCasePerLine" : true, + "OneVariableDeclarationPerLine" : true, + "OnlyOneTrailingClosureArgument" : true, + "OrderedImports" : true, + "ReplaceForEachWithForLoop" : true, + "ReturnVoidInsteadOfEmptyTuple" : true, + "UseEarlyExits" : false, + "UseExplicitNilCheckInConditions" : false, + "UseLetInEveryBoundCaseVariable" : false, + "UseShorthandTypeNames" : true, + "UseSingleLinePropertyGetter" : false, + "UseSynthesizedInitializer" : false, + "UseTripleSlashForDocumentationComments" : true, + "UseWhereClausesInForLoops" : false, + "ValidateDocumentationComments" : false + } +} diff --git a/.swiftformat b/.swiftformat deleted file mode 100644 index 4f7cf860..00000000 --- a/.swiftformat +++ /dev/null @@ -1,25 +0,0 @@ -# file options - ---swiftversion 5.4 ---exclude .build - -# format options - ---self insert ---patternlet inline ---ranges nospace ---stripunusedargs unnamed-only ---ifdef no-indent ---extensionacl on-declarations ---disable typeSugar # https://github.com/nicklockwood/SwiftFormat/issues/636 ---disable andOperator ---disable wrapMultilineStatementBraces ---disable enumNamespaces ---disable redundantExtensionACL ---disable redundantReturn ---disable preferKeyPath ---disable sortedSwitchCases ---disable hoistTry ---disable hoistAwait - -# rules diff --git a/.unacceptablelanguageignore b/.unacceptablelanguageignore new file mode 100644 index 00000000..cb6d6d2d --- /dev/null +++ b/.unacceptablelanguageignore @@ -0,0 +1,2 @@ +Sources/Crdkafka/* +Sources/COpenSSL/* diff --git a/.yamllint.yml b/.yamllint.yml new file mode 100644 index 00000000..d4cca2ed --- /dev/null +++ b/.yamllint.yml @@ -0,0 +1,12 @@ +extends: default + +rules: + line-length: false + document-start: false + truthy: + check-keys: false # Otherwise we get a false positive on GitHub action's `on` key + +# Custom ignores +ignore: | + Sources/Crdkafka/ + Sources/COpenSSL/ diff --git a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift index ec0bff9a..9c49a2c3 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift @@ -15,12 +15,13 @@ import Benchmark import Crdkafka import Dispatch -import struct Foundation.Date -import struct Foundation.UUID import Kafka import Logging import ServiceLifecycle +import struct Foundation.Date +import struct Foundation.UUID + let benchmarks = { var uniqueTestTopic: String! let messageCount: UInt = 1000 @@ -80,7 +81,11 @@ let benchmarks = { logger: .perfLogger ) - let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger) + let serviceGroupConfiguration = ServiceGroupConfiguration( + services: [consumer], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: .perfLogger + ) let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) try await withThrowingTaskGroup(of: Void.self) { group in @@ -119,7 +124,9 @@ let benchmarks = { let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 - benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + benchLog( + "All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec" + ) } // Wait for second Consumer Task to complete @@ -149,7 +156,11 @@ let benchmarks = { logger: .perfLogger ) - let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger) + let serviceGroupConfiguration = ServiceGroupConfiguration( + services: [consumer], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: .perfLogger + ) let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) try await withThrowingTaskGroup(of: Void.self) { group in @@ -190,7 +201,9 @@ let benchmarks = { let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 - benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + benchLog( + "All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec" + ) } // Wait for second Consumer Task to complete @@ -265,7 +278,9 @@ let benchmarks = { let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 - benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + benchLog( + "All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec" + ) } Benchmark("librdkafka_with_offset_commit_messages_\(messageCount)") { benchmark in @@ -340,6 +355,8 @@ let benchmarks = { let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 - benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + benchLog( + "All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec" + ) } } diff --git a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/Utilities.swift b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/Utilities.swift index 304dc1fb..a6ebef68 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/Utilities.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/Utilities.swift @@ -13,18 +13,20 @@ //===----------------------------------------------------------------------===// import Benchmark -import class Foundation.ProcessInfo -import struct Foundation.UUID import Kafka @_spi(Internal) import Kafka import Logging import ServiceLifecycle +import class Foundation.ProcessInfo +import struct Foundation.UUID + let brokerAddress = KafkaConfiguration.BrokerAddress( host: ProcessInfo.processInfo.environment["KAFKA_HOST"] ?? "localhost", port: 9092 ) +// swift-format-ignore: DontRepeatTypeInStaticProperties extension Logger { static let perfLogger = { var logger = Logger(label: "perf logger") @@ -76,7 +78,11 @@ func prepareTopic(messagesCount: UInt, partitions: Int32 = -1, logger: Logger = let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: logger) - let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger) + let serviceGroupConfiguration = ServiceGroupConfiguration( + services: [producer], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) try await withThrowingTaskGroup(of: Void.self) { group in @@ -107,6 +113,7 @@ func prepareTopic(messagesCount: UInt, partitions: Int32 = -1, logger: Logger = return uniqueTestTopic } +// swift-format-ignore: AmbiguousTrailingClosureOverload extension Benchmark { @discardableResult func withMeasurement(_ body: () throws -> T) rethrows -> T { diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift index 4ea81f8c..4301f8de 100644 --- a/Benchmarks/Package.swift +++ b/Benchmarks/Package.swift @@ -18,7 +18,7 @@ import PackageDescription let package = Package( name: "benchmarks", platforms: [ - .macOS(.v13), + .macOS(.v13) ], dependencies: [ .package(path: "../"), @@ -33,7 +33,7 @@ let package = Package( ], path: "Benchmarks/SwiftKafkaConsumerBenchmarks", plugins: [ - .plugin(name: "BenchmarkPlugin", package: "package-benchmark"), + .plugin(name: "BenchmarkPlugin", package: "package-benchmark") ] ), .executableTarget( @@ -44,7 +44,7 @@ let package = Package( ], path: "Benchmarks/SwiftKafkaProducerBenchmarks", plugins: [ - .plugin(name: "BenchmarkPlugin", package: "package-benchmark"), + .plugin(name: "BenchmarkPlugin", package: "package-benchmark") ] ), ] diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4d23ad56..d58c702e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -63,6 +63,10 @@ We require that your commit messages match our template. The easiest way to do t git config commit.template dev/git.commit.template +### Run CI checks locally + +You can run the GitHub Actions workflows locally using [act](https://github.com/nektos/act). For detailed steps on how to do this please see [https://github.com/swiftlang/github-workflows?tab=readme-ov-file#running-workflows-locally](https://github.com/swiftlang/github-workflows?tab=readme-ov-file#running-workflows-locally). + ## How to contribute your work Please open a pull request at https://github.com/swift-server/swift-kafka-client. Make sure the CI passes, and then wait for code review. diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index 210e5270..4b20e9b2 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -15,6 +15,7 @@ needs to be listed here. - Felix Schlegel - Franz Busch - FranzBusch +- Rick Newton-Rogers - SHILPEE GUPTA <78029920+shilpeegupta14@users.noreply.github.com> - Yim Lee - blindspotbounty <127803250+blindspotbounty@users.noreply.github.com> diff --git a/Package.swift b/Package.swift index c13229ec..b13b6858 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version: 5.7 +// swift-tools-version: 5.9 //===----------------------------------------------------------------------===// // // This source file is part of the swift-kafka-client open source project @@ -69,12 +69,12 @@ let package = Package( .headerSearchPath("./custom/config/dummy"), .headerSearchPath("./custom/include"), .headerSearchPath("./librdkafka/src"), - .define("_GNU_SOURCE", to: "1"), // Fix build error for Swift 5.9 onwards + .define("_GNU_SOURCE", to: "1"), // Fix build error for Swift 5.9 onwards ], linkerSettings: [ .linkedLibrary("curl"), .linkedLibrary("sasl2"), - .linkedLibrary("z"), // zlib + .linkedLibrary("z"), // zlib ] ), .target( @@ -90,7 +90,7 @@ let package = Package( .target( name: "KafkaFoundationCompat", dependencies: [ - "Kafka", + "Kafka" ] ), .testTarget( diff --git a/Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift b/Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift index e9878c99..839f8fc8 100644 --- a/Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift +++ b/Sources/Kafka/Configuration/KafkaConfiguration+Metrics.swift @@ -20,14 +20,11 @@ extension KafkaConfiguration { /// Configuration for the consumer metrics emitted by `SwiftKafka`. public struct ConsumerMetrics: Sendable { internal var enabled: Bool { - self.updateInterval != nil && - (self.queuedOperation != nil || - self.totalKafkaBrokerRequests != nil || - self.totalKafkaBrokerBytesSent != nil || - self.totalKafkaBrokerResponses != nil || - self.totalKafkaBrokerResponsesSize != nil || - self.totalKafkaBrokerMessagesBytesRecieved != nil || - self.topicsInMetadataCache != nil) + self.updateInterval != nil + && (self.queuedOperation != nil || self.totalKafkaBrokerRequests != nil + || self.totalKafkaBrokerBytesSent != nil || self.totalKafkaBrokerResponses != nil + || self.totalKafkaBrokerResponsesSize != nil || self.totalKafkaBrokerMessagesBytesRecieved != nil + || self.topicsInMetadataCache != nil) } /// Update interval for statistics. @@ -55,7 +52,8 @@ extension KafkaConfiguration { private static func record(_ value: T?, to: Gauge?) { guard let value, - let to else { + let to + else { return } to.record(value) @@ -70,7 +68,10 @@ extension KafkaConfiguration { Self.record(rdKafkaStatistics.totalKafkaBrokerResponsesSize, to: self.totalKafkaBrokerResponsesSize) Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesRecieved, to: self.totalKafkaBrokerMessagesRecieved) - Self.record(rdKafkaStatistics.totalKafkaBrokerMessagesBytesRecieved, to: self.totalKafkaBrokerMessagesBytesRecieved) + Self.record( + rdKafkaStatistics.totalKafkaBrokerMessagesBytesRecieved, + to: self.totalKafkaBrokerMessagesBytesRecieved + ) Self.record(rdKafkaStatistics.topicsInMetadataCache, to: self.topicsInMetadataCache) } @@ -79,17 +80,12 @@ extension KafkaConfiguration { /// Configuration for the producer metrics emitted by `SwiftKafka`. public struct ProducerMetrics: Sendable { internal var enabled: Bool { - self.updateInterval != nil && - (self.queuedOperation != nil || - self.queuedProducerMessages != nil || - self.queuedProducerMessagesSize != nil || - self.totalKafkaBrokerRequests != nil || - self.totalKafkaBrokerBytesSent != nil || - self.totalKafkaBrokerResponses != nil || - self.totalKafkaBrokerResponsesSize != nil || - self.totalKafkaBrokerMessagesSent != nil || - self.totalKafkaBrokerMessagesBytesSent != nil || - self.topicsInMetadataCache != nil) + self.updateInterval != nil + && (self.queuedOperation != nil || self.queuedProducerMessages != nil + || self.queuedProducerMessagesSize != nil || self.totalKafkaBrokerRequests != nil + || self.totalKafkaBrokerBytesSent != nil || self.totalKafkaBrokerResponses != nil + || self.totalKafkaBrokerResponsesSize != nil || self.totalKafkaBrokerMessagesSent != nil + || self.totalKafkaBrokerMessagesBytesSent != nil || self.topicsInMetadataCache != nil) } /// Update interval for statistics. @@ -121,7 +117,8 @@ extension KafkaConfiguration { private static func record(_ value: T?, to: Gauge?) { guard let value, - let to else { + let to + else { return } to.record(value) diff --git a/Sources/Kafka/Configuration/KafkaConfiguration+Security.swift b/Sources/Kafka/Configuration/KafkaConfiguration+Security.swift index e8457dd3..0dd44667 100644 --- a/Sources/Kafka/Configuration/KafkaConfiguration+Security.swift +++ b/Sources/Kafka/Configuration/KafkaConfiguration+Security.swift @@ -29,14 +29,14 @@ extension KafkaConfiguration { /// Read certificate chain from a file. public static func file(location: String) -> LeafAndIntermediates { - return LeafAndIntermediates( + LeafAndIntermediates( _internal: .file(location: location) ) } /// Read X.509 certificate from String. public static func pem(_ pem: String) -> LeafAndIntermediates { - return LeafAndIntermediates( + LeafAndIntermediates( _internal: .pem(pem) ) } @@ -56,14 +56,14 @@ extension KafkaConfiguration { /// File or directory path to root certificate(s) for verifying the broker's key. public static func file(location: String) -> TrustRoots { - return TrustRoots( + TrustRoots( _internal: .file(location: location) ) } /// Trust roots certificate String for verifying the broker's key. public static func pem(_ pem: String) -> TrustRoots { - return TrustRoots( + TrustRoots( _internal: .pem(pem) ) } @@ -81,14 +81,14 @@ extension KafkaConfiguration { /// A key located in a file at the given `location`. public static func file(location: String) -> Location { - return Location( + Location( _internal: .file(location: location) ) } /// A key String (PEM format). public static func pem(_ pem: String) -> Location { - return Location( + Location( _internal: .pem(pem) ) } @@ -139,7 +139,7 @@ extension KafkaConfiguration { privateKey: PrivateKey, certificates: LeafAndIntermediates ) -> ClientIdentity { - return .init( + .init( _internal: .keyPair( privateKey: privateKey, certificates: certificates @@ -152,7 +152,7 @@ extension KafkaConfiguration { /// - Parameters: /// - keyStore: The client's keystore (PKCS#12) used for authentication. public static func keyStore(keyStore: KeyStore) -> ClientIdentity { - return .init(_internal: .keyStore(keyStore: keyStore)) + .init(_internal: .keyStore(keyStore: keyStore)) } } @@ -180,7 +180,7 @@ extension KafkaConfiguration { trustRoots: TrustRoots = .probe, certificateRevocationListPath: String? = nil ) -> BrokerVerification { - return .init( + .init( _internal: .verify( trustRoots: trustRoots, certificateRevocationListPath: certificateRevocationListPath @@ -213,14 +213,14 @@ extension KafkaConfiguration { break case .keyPair(let privateKey, let certificate): switch privateKey.key._internal { - case .file(location: let location): + case .file(let location): resultDict["ssl.key.location"] = location case .pem(let pem): resultDict["ssl.key.pem"] = pem } resultDict["ssl.key.password"] = privateKey.password switch certificate._internal { - case .file(location: let location): + case .file(let location): resultDict["ssl.key.location"] = location resultDict["ssl.certificate.location"] = location case .pem(let pem): @@ -240,7 +240,7 @@ extension KafkaConfiguration { switch trustRoots._internal { case .probe: resultDict["ssl.ca.location"] = "probe" - case .file(location: let location): + case .file(let location): resultDict["ssl.ca.location"] = location case .pem(let pem): resultDict["ssl.ca.pem"] = pem @@ -269,9 +269,9 @@ extension KafkaConfiguration { /// %{config.prop.name} is replaced by corresponding config object value. /// Default: `kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}"`. public var kinitCommand: String = """ - kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || \ - kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}" - """ + kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || \ + kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}" + """ /// Path to Kerberos keytab file. /// This configuration property is only used as a variable in ``KafkaConfiguration/SASLMechanism/KerberosConfiguration/kinitCommand`` /// as ... -t "%{sasl.kerberos.keytab}". @@ -338,7 +338,7 @@ extension KafkaConfiguration { /// In addition, SASL extensions can be communicated to the broker via `extension_NAME=value`. /// For example: `principal=admin extension_traceId=123` public static func `default`(configuration: String? = nil) -> OAuthBearerMethod { - return OAuthBearerMethod(_internal: .default(configuration: configuration)) + OAuthBearerMethod(_internal: .default(configuration: configuration)) } /// OpenID Connect (OIDC). @@ -367,7 +367,7 @@ extension KafkaConfiguration { scope: String? = nil, extensions: String? = nil ) -> OAuthBearerMethod { - return OAuthBearerMethod( + OAuthBearerMethod( _internal: .oidc( configuration: configuration, clientID: clientID, @@ -392,35 +392,35 @@ extension KafkaConfiguration { /// Use the GSSAPI mechanism. public static func gssapi(kerberosConfiguration: KerberosConfiguration) -> SASLMechanism { - return SASLMechanism( + SASLMechanism( _internal: .gssapi(kerberosConfiguration: kerberosConfiguration) ) } /// Use the PLAIN mechanism. public static func plain(username: String, password: String) -> SASLMechanism { - return SASLMechanism( + SASLMechanism( _internal: .plain(username: username, password: password) ) } /// Use the SCRAM-SHA-256 mechanism. public static func scramSHA256(username: String, password: String) -> SASLMechanism { - return SASLMechanism( + SASLMechanism( _internal: .scramSHA256(username: username, password: password) ) } /// Use the SCRAM-SHA-512 mechanism. public static func scramSHA512(username: String, password: String) -> SASLMechanism { - return SASLMechanism( + SASLMechanism( _internal: .scramSHA512(username: username, password: password) ) } /// Use the OAUTHBEARER mechanism. public static func oAuthBearer(method: OAuthBearerMethod) -> SASLMechanism { - return SASLMechanism( + SASLMechanism( _internal: .oAuthBearer(method: method) ) } @@ -437,7 +437,9 @@ extension KafkaConfiguration { resultDict["sasl.kerberos.principal"] = kerberosConfiguration.principal resultDict["sasl.kerberos.kinit.cmd"] = kerberosConfiguration.kinitCommand resultDict["sasl.kerberos.keytab"] = kerberosConfiguration.keytab - resultDict["sasl.kerberos.min.time.before.relogin"] = String(kerberosConfiguration.minTimeBeforeRelogin.rawValue) + resultDict["sasl.kerberos.min.time.before.relogin"] = String( + kerberosConfiguration.minTimeBeforeRelogin.rawValue + ) case .plain(let username, let password): resultDict["sasl.mechanism"] = "PLAIN" resultDict["sasl.username"] = username @@ -498,14 +500,14 @@ extension KafkaConfiguration { /// Use the Transport Layer Security (TLS) protocol. public static func tls(configuration: TLSConfiguration = TLSConfiguration()) -> SecurityProtocol { - return SecurityProtocol( + SecurityProtocol( _internal: .tls(configuration: configuration) ) } /// Use the Simple Authentication and Security Layer (SASL). public static func saslPlaintext(mechanism: SASLMechanism) -> SecurityProtocol { - return SecurityProtocol( + SecurityProtocol( _internal: .saslPlaintext(mechanism: mechanism) ) } @@ -515,7 +517,7 @@ extension KafkaConfiguration { saslMechanism: SASLMechanism, tlsConfiguration: TLSConfiguration = TLSConfiguration() ) -> SecurityProtocol { - return SecurityProtocol( + SecurityProtocol( _internal: .saslTLS(saslMechanism: saslMechanism, tlsConfiguration: tlsConfiguration) ) } diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index 2d76e2da..b9bf3833 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -49,7 +49,7 @@ public struct KafkaConsumerConfiguration { topic: String, offset: KafkaOffset = .end ) -> ConsumptionStrategy { - return .init(consumptionStrategy: .partition(groupID: groupID, topic: topic, partition: partition, offset: offset)) + .init(consumptionStrategy: .partition(groupID: groupID, topic: topic, partition: partition, offset: offset)) } /// A consumption strategy based on consumer group membership. @@ -59,7 +59,7 @@ public struct KafkaConsumerConfiguration { /// - id: The ID of the consumer group to join. /// - topics: An array of topic names to consume from. public static func group(id groupID: String, topics: [String]) -> ConsumptionStrategy { - return .init(consumptionStrategy: .group(groupID: groupID, topics: topics)) + .init(consumptionStrategy: .group(groupID: groupID, topics: topics)) } } @@ -231,7 +231,7 @@ extension KafkaConsumerConfiguration { var resultDict: [String: String] = [:] switch self.consumptionStrategy._internal { - case .partition(groupID: let groupID, topic: _, partition: _, offset: _): + case .partition(let groupID, topic: _, partition: _, offset: _): if let groupID = groupID { resultDict["group.id"] = groupID } else { @@ -242,7 +242,7 @@ extension KafkaConsumerConfiguration { resultDict["group.id"] = UUID().uuidString } - case .group(groupID: let groupID, topics: _): + case .group(let groupID, topics: _): resultDict["group.id"] = groupID } @@ -264,7 +264,7 @@ extension KafkaConsumerConfiguration { resultDict["topic.metadata.refresh.fast.interval.ms"] = String(topicMetadata.refreshFastInterval.inMilliseconds) resultDict["topic.metadata.refresh.sparse"] = String(topicMetadata.isSparseRefreshingEnabled) resultDict["topic.metadata.propagation.max.ms"] = String(topicMetadata.maximumPropagation.inMilliseconds) - resultDict["topic.blacklist"] = topicDenylist.joined(separator: ",") + resultDict["topic.blacklist"] = topicDenylist.joined(separator: ",") // ignore-unacceptable-language if !debugOptions.isEmpty { resultDict["debug"] = debugOptions.map(\.description).joined(separator: ",") } @@ -281,7 +281,8 @@ extension KafkaConsumerConfiguration { resultDict["reconnect.backoff.max.ms"] = String(reconnect.maximumBackoff.inMilliseconds) if self.metrics.enabled, - let updateInterval = self.metrics.updateInterval { + let updateInterval = self.metrics.updateInterval + { resultDict["statistics.interval.ms"] = String(updateInterval.inMilliseconds) } diff --git a/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift b/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift index 2a345b02..cfa64394 100644 --- a/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift @@ -60,7 +60,7 @@ public struct KafkaProducerConfiguration { } public static func maximumLimit(_ value: Int) -> MessageLimit { - return .init(rawValue: value) + .init(rawValue: value) } /// No limit for the maximum number of messages allowed on the producer queue. @@ -196,10 +196,12 @@ extension KafkaProducerConfiguration { resultDict["max.in.flight.requests.per.connection"] = String(self.maximumInFlightRequestsPerConnection) resultDict["metadata.max.age.ms"] = String(self.maximumMetadataAge.inMilliseconds) resultDict["topic.metadata.refresh.interval.ms"] = String(self.topicMetadata.refreshInterval.rawValue) - resultDict["topic.metadata.refresh.fast.interval.ms"] = String(self.topicMetadata.refreshFastInterval.inMilliseconds) + resultDict["topic.metadata.refresh.fast.interval.ms"] = String( + self.topicMetadata.refreshFastInterval.inMilliseconds + ) resultDict["topic.metadata.refresh.sparse"] = String(self.topicMetadata.isSparseRefreshingEnabled) resultDict["topic.metadata.propagation.max.ms"] = String(self.topicMetadata.maximumPropagation.inMilliseconds) - resultDict["topic.blacklist"] = self.topicDenylist.joined(separator: ",") + resultDict["topic.blacklist"] = self.topicDenylist.joined(separator: ",") // ignore-unacceptable-language if !self.debugOptions.isEmpty { resultDict["debug"] = self.debugOptions.map(\.description).joined(separator: ",") } @@ -216,7 +218,8 @@ extension KafkaProducerConfiguration { resultDict["reconnect.backoff.max.ms"] = String(self.reconnect.maximumBackoff.inMilliseconds) if self.metrics.enabled, - let updateInterval = self.metrics.updateInterval { + let updateInterval = self.metrics.updateInterval + { resultDict["statistics.interval.ms"] = String(updateInterval.inMilliseconds) } diff --git a/Sources/Kafka/Configuration/KafkaTopicConfiguration.swift b/Sources/Kafka/Configuration/KafkaTopicConfiguration.swift index 1b400836..10570c53 100644 --- a/Sources/Kafka/Configuration/KafkaTopicConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaTopicConfiguration.swift @@ -23,7 +23,7 @@ public struct KafkaTopicConfiguration { } public static func atLeast(_ value: Int) -> RequiredAcknowledgments { - return .init(rawValue: value) + .init(rawValue: value) } /// Broker will block until the message is committed by all in-sync replicas (ISRs). @@ -114,7 +114,7 @@ public struct KafkaTopicConfiguration { } public static func level(_ value: Int) -> Level { - return .init(rawValue: value) + .init(rawValue: value) } /// Codec-dependent default compression level. @@ -126,7 +126,7 @@ public struct KafkaTopicConfiguration { private enum _Codec: Sendable, Hashable, CustomStringConvertible { case none case gzip(compressionLevel: Level) - case snappy // only compression level is 0 + case snappy // only compression level is 0 case lz4(compressionLevel: Level) case zstd(compressionLevel: Level) case inherit @@ -206,7 +206,7 @@ public struct KafkaTopicConfiguration { /// zstd compression. public func zstd(compressionLevel: Level) -> Codec { - return Codec(_internal: .zstd(compressionLevel: compressionLevel)) + Codec(_internal: .zstd(compressionLevel: compressionLevel)) } /// Inherit global `compression.codec` configuration. diff --git a/Sources/Kafka/Data/String+KafkaContiguousBytes.swift b/Sources/Kafka/Data/String+KafkaContiguousBytes.swift index 1e455f26..433281e8 100644 --- a/Sources/Kafka/Data/String+KafkaContiguousBytes.swift +++ b/Sources/Kafka/Data/String+KafkaContiguousBytes.swift @@ -18,7 +18,10 @@ extension String: KafkaContiguousBytes { public func withUnsafeBytes(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R { if let read = try self.utf8.withContiguousStorageIfAvailable({ unsafePointer in // Fast Path - let unsafeRawBufferPointer = UnsafeRawBufferPointer(start: unsafePointer.baseAddress, count: self.utf8.count) + let unsafeRawBufferPointer = UnsafeRawBufferPointer( + start: unsafePointer.baseAddress, + count: self.utf8.count + ) return try body(unsafeRawBufferPointer) }) { return read diff --git a/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift b/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift index 58f2453d..2eb7b153 100644 --- a/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift +++ b/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift @@ -13,9 +13,10 @@ //===----------------------------------------------------------------------===// import Crdkafka -import struct Foundation.UUID import Logging +import struct Foundation.UUID + @_spi(Internal) extension RDKafkaClient { /// Create a topic with a unique name (`UUID`). @@ -30,13 +31,15 @@ extension RDKafkaClient { let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) defer { errorChars.deallocate() } - guard let newTopic = rd_kafka_NewTopic_new( - uniqueTopicName, - partitions, - -1, // use default replication_factor - errorChars, - RDKafkaClient.stringSize - ) else { + guard + let newTopic = rd_kafka_NewTopic_new( + uniqueTopicName, + partitions, + -1, // use default replication_factor + errorChars, + RDKafkaClient.stringSize + ) + else { let errorString = String(cString: errorChars) throw KafkaError.topicCreation(reason: errorString) } @@ -66,7 +69,9 @@ extension RDKafkaClient { } guard let topicsResultEvent = rd_kafka_event_CreateTopics_result(resultEvent) else { - throw KafkaError.topicCreation(reason: "Received event that is not of type rd_kafka_CreateTopics_result_t") + throw KafkaError.topicCreation( + reason: "Received event that is not of type rd_kafka_CreateTopics_result_t" + ) } var resultTopicCount = 0 @@ -126,7 +131,9 @@ extension RDKafkaClient { } guard let topicsResultEvent = rd_kafka_event_DeleteTopics_result(resultEvent) else { - throw KafkaError.topicDeletion(reason: "Received event that is not of type rd_kafka_DeleteTopics_result_t") + throw KafkaError.topicDeletion( + reason: "Received event that is not of type rd_kafka_DeleteTopics_result_t" + ) } var resultTopicCount = 0 @@ -152,6 +159,6 @@ extension RDKafkaClient { } public static func makeClientForTopics(config: KafkaConsumerConfiguration, logger: Logger) throws -> RDKafkaClient { - return try Self.makeClient(type: .consumer, configDictionary: config.dictionary, events: [], logger: logger) + try Self.makeClient(type: .consumer, configDictionary: config.dictionary, events: [], logger: logger) } } diff --git a/Sources/Kafka/ForTesting/TestMessages.swift b/Sources/Kafka/ForTesting/TestMessages.swift index f9df6224..ee12963b 100644 --- a/Sources/Kafka/ForTesting/TestMessages.swift +++ b/Sources/Kafka/ForTesting/TestMessages.swift @@ -11,9 +11,10 @@ // SPDX-License-Identifier: Apache-2.0 // //===----------------------------------------------------------------------===// -import struct Foundation.Date import NIOCore +import struct Foundation.Date + @_spi(Internal) public enum _TestMessagesError: Error { case deliveryReportsIdsIncorrect @@ -27,7 +28,7 @@ public func _createTestMessages( headers: [KafkaHeader] = [], count: UInt ) -> [KafkaProducerMessage] { - return Array(0..= messages.count { @@ -96,8 +97,9 @@ public func _sendAndAcknowledgeMessages( } for message in messages { guard acknowledgedMessages.contains(where: { $0.topic == message.topic }), - acknowledgedMessages.contains(where: { $0.key == ByteBuffer(string: message.key!) }), - acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) }) else { + acknowledgedMessages.contains(where: { $0.key == ByteBuffer(string: message.key!) }), + acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) }) + else { throw _TestMessagesError.deliveryReportsIncorrect } } diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 2fdead66..f44b2444 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -26,11 +26,11 @@ internal struct KafkaConsumerEventsDelegate: Sendable { extension KafkaConsumerEventsDelegate: NIOAsyncSequenceProducerDelegate { func produceMore() { - return // No back pressure + return // No back pressure } func didTerminate() { - return // We have to call poll for events anyway, nothing to do here + return // We have to call poll for events anyway, nothing to do here } } @@ -53,7 +53,7 @@ public struct KafkaConsumerEvents: Sendable, AsyncSequence { } public func makeAsyncIterator() -> AsyncIterator { - return AsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator()) + AsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator()) } } @@ -73,7 +73,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { private let stateMachineHolder: MachineHolder let pollInterval: Duration - private final class MachineHolder: Sendable { // only for deinit + private final class MachineHolder: Sendable { // only for deinit let stateMachine: LockedMachine init(stateMachine: LockedMachine) { @@ -99,12 +99,12 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { switch action { case .poll(let client): - if let message = try client.consumerPoll() { // non-blocking call + if let message = try client.consumerPoll() { // non-blocking call return message } try await Task.sleep(for: self.pollInterval) case .suspendPollLoop: - try await Task.sleep(for: self.pollInterval) // not started yet + try await Task.sleep(for: self.pollInterval) // not started yet case .terminatePollLoop: return nil } @@ -114,7 +114,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { } public func makeAsyncIterator() -> AsyncIterator { - return AsyncIterator( + AsyncIterator( stateMachine: self.stateMachine, pollInterval: self.pollInterval ) @@ -321,9 +321,9 @@ public final class KafkaConsumer: Sendable, Service { private func _run() async throws { switch self.configuration.consumptionStrategy._internal { - case .partition(groupID: _, topic: let topic, partition: let partition, offset: let offset): + case .partition(groupID: _, let topic, let partition, let offset): try self.assign(topic: topic, partition: partition, offset: offset) - case .group(groupID: _, topics: let topics): + case .group(groupID: _, let topics): try self.subscribe(topics: topics) } try await self.eventRunLoop() @@ -480,7 +480,9 @@ extension KafkaConsumer { client: RDKafkaClient ) { guard case .uninitialized = self.state else { - fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") + fatalError( + "\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)" + ) } self.state = .initializing( client: client diff --git a/Sources/Kafka/KafkaError.swift b/Sources/Kafka/KafkaError.swift index ddaf119a..7c269e15 100644 --- a/Sources/Kafka/KafkaError.swift +++ b/Sources/Kafka/KafkaError.swift @@ -56,82 +56,122 @@ public struct KafkaError: Error, CustomStringConvertible, @unchecked Sendable { } static func rdKafkaError( - wrapping error: rd_kafka_resp_err_t, file: String = #fileID, line: UInt = #line + wrapping error: rd_kafka_resp_err_t, + file: String = #fileID, + line: UInt = #line ) -> KafkaError { let errorMessage = String(cString: rd_kafka_err2str(error)) return KafkaError( backing: .init( - code: .underlying, reason: errorMessage, file: file, line: line + code: .underlying, + reason: errorMessage, + file: file, + line: line ) ) } static func config( - reason: String, file: String = #fileID, line: UInt = #line + reason: String, + file: String = #fileID, + line: UInt = #line ) -> KafkaError { - return KafkaError( + KafkaError( backing: .init( - code: .config, reason: reason, file: file, line: line + code: .config, + reason: reason, + file: file, + line: line ) ) } static func topicConfig( - reason: String, file: String = #fileID, line: UInt = #line + reason: String, + file: String = #fileID, + line: UInt = #line ) -> KafkaError { - return KafkaError( + KafkaError( backing: .init( - code: .topicConfig, reason: reason, file: file, line: line + code: .topicConfig, + reason: reason, + file: file, + line: line ) ) } static func client( - reason: String, file: String = #fileID, line: UInt = #line + reason: String, + file: String = #fileID, + line: UInt = #line ) -> KafkaError { - return KafkaError( + KafkaError( backing: .init( - code: .connectionFailed, reason: reason, file: file, line: line + code: .connectionFailed, + reason: reason, + file: file, + line: line ) ) } static func connectionClosed( - reason: String, file: String = #fileID, line: UInt = #line + reason: String, + file: String = #fileID, + line: UInt = #line ) -> KafkaError { - return KafkaError( + KafkaError( backing: .init( - code: .shutdown, reason: reason, file: file, line: line + code: .shutdown, + reason: reason, + file: file, + line: line ) ) } static func messageConsumption( - reason: String, file: String = #fileID, line: UInt = #line + reason: String, + file: String = #fileID, + line: UInt = #line ) -> KafkaError { - return KafkaError( + KafkaError( backing: .init( - code: .messageConsumptionFailed, reason: reason, file: file, line: line + code: .messageConsumptionFailed, + reason: reason, + file: file, + line: line ) ) } static func topicCreation( - reason: String, file: String = #fileID, line: UInt = #line + reason: String, + file: String = #fileID, + line: UInt = #line ) -> KafkaError { - return KafkaError( + KafkaError( backing: .init( - code: .topicCreationFailed, reason: reason, file: file, line: line + code: .topicCreationFailed, + reason: reason, + file: file, + line: line ) ) } static func topicDeletion( - reason: String, file: String = #fileID, line: UInt = #line + reason: String, + file: String = #fileID, + line: UInt = #line ) -> KafkaError { - return KafkaError( + KafkaError( backing: .init( - code: .topicDeletionFailed, reason: reason, file: file, line: line + code: .topicDeletionFailed, + reason: reason, + file: file, + line: line ) ) } @@ -179,7 +219,7 @@ extension KafkaError { public static let topicDeletionFailed = ErrorCode(.topicDeletion) public var description: String { - return String(describing: self.backingCode) + String(describing: self.backingCode) } } } @@ -210,7 +250,7 @@ extension KafkaError { // Only the error code matters for equality. static func == (lhs: Backing, rhs: Backing) -> Bool { - return lhs.code == rhs.code + lhs.code == rhs.code } func hash(into hasher: inout Hasher) { @@ -218,7 +258,7 @@ extension KafkaError { } fileprivate func copy() -> Backing { - return Backing(code: self.code, reason: self.reason, file: self.file, line: self.line) + Backing(code: self.code, reason: self.reason, file: self.file, line: self.line) } } } @@ -227,7 +267,7 @@ extension KafkaError { extension KafkaError: Hashable { public static func == (lhs: KafkaError, rhs: KafkaError) -> Bool { - return lhs.backing == rhs.backing + lhs.backing == rhs.backing } public func hash(into hasher: inout Hasher) { diff --git a/Sources/Kafka/KafkaOffset.swift b/Sources/Kafka/KafkaOffset.swift index 17fd9ec4..e4ba9822 100644 --- a/Sources/Kafka/KafkaOffset.swift +++ b/Sources/Kafka/KafkaOffset.swift @@ -35,7 +35,7 @@ public struct KafkaOffset: RawRepresentable { /// Example: Current end offset is at `12345` and `count = 200`. /// This means start reading offset from offset `12345 - 200 = 12145`. public static func tail(_ count: Int) -> KafkaOffset { - return KafkaOffset(rawValue: Int(RD_KAFKA_OFFSET_TAIL_BASE) - count) + KafkaOffset(rawValue: Int(RD_KAFKA_OFFSET_TAIL_BASE) - count) } } diff --git a/Sources/Kafka/KafkaProducer.swift b/Sources/Kafka/KafkaProducer.swift index de953312..1f2e8d07 100644 --- a/Sources/Kafka/KafkaProducer.swift +++ b/Sources/Kafka/KafkaProducer.swift @@ -27,7 +27,7 @@ internal struct KafkaProducerCloseOnTerminate: Sendable { extension KafkaProducerCloseOnTerminate: NIOAsyncSequenceProducerDelegate { func produceMore() { - return // No back pressure + return // No back pressure } func didTerminate() { @@ -60,7 +60,7 @@ public struct KafkaProducerEvents: Sendable, AsyncSequence { } public func makeAsyncIterator() -> AsyncIterator { - return AsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator()) + AsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator()) } } @@ -116,7 +116,7 @@ public final class KafkaProducer: Service, Sendable { ) throws { let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) - var subscribedEvents: [RDKafkaEvent] = [.log] // No .deliveryReport here! + var subscribedEvents: [RDKafkaEvent] = [.log] // No .deliveryReport here! if configuration.metrics.enabled { subscribedEvents.append(.statistics) @@ -238,7 +238,7 @@ public final class KafkaProducer: Service, Sendable { 0...Int(Int32.max) ~= self.configuration.flushTimeoutMilliseconds, "Flush timeout outside of valid range \(0...Int32.max)" ) - defer { // we should finish source indefinetely of exception in client.flush() + defer { // we should finish source indefinetely of exception in client.flush() source?.finish() } try await client.flush(timeoutMilliseconds: Int32(self.configuration.flushTimeoutMilliseconds)) @@ -336,7 +336,9 @@ extension KafkaProducer { source: Producer.Source? ) { guard case .uninitialized = self.state else { - fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") + fatalError( + "\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)" + ) } self.state = .started( client: client, @@ -419,7 +421,9 @@ extension KafkaProducer { topicHandles: topicHandles ) case .eventConsumptionFinished: - throw KafkaError.connectionClosed(reason: "Sequence consuming events was abruptly terminated, producer closed") + throw KafkaError.connectionClosed( + reason: "Sequence consuming events was abruptly terminated, producer closed" + ) case .finishing: throw KafkaError.connectionClosed(reason: "Producer in the process of finishing") case .finished: diff --git a/Sources/Kafka/KafkaProducerEvent.swift b/Sources/Kafka/KafkaProducerEvent.swift index a4a27262..e461ebe3 100644 --- a/Sources/Kafka/KafkaProducerEvent.swift +++ b/Sources/Kafka/KafkaProducerEvent.swift @@ -21,7 +21,7 @@ public enum KafkaProducerEvent: Sendable, Hashable { internal init(_ event: RDKafkaClient.KafkaEvent) { switch event { - case .deliveryReport(results: let results): + case .deliveryReport(let results): self = .deliveryReports(results) case .statistics: fatalError("Cannot cast \(event) to KafkaProducerEvent") diff --git a/Sources/Kafka/KafkaProducerMessageID.swift b/Sources/Kafka/KafkaProducerMessageID.swift index 9dde24ce..5800bc08 100644 --- a/Sources/Kafka/KafkaProducerMessageID.swift +++ b/Sources/Kafka/KafkaProducerMessageID.swift @@ -27,7 +27,7 @@ public struct KafkaProducerMessageID { extension KafkaProducerMessageID: CustomStringConvertible { public var description: String { - return String(self.rawValue) + String(self.rawValue) } } diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 9e62f945..06906ff0 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -14,9 +14,10 @@ import Crdkafka import Dispatch -import class Foundation.JSONDecoder import Logging +import class Foundation.JSONDecoder + /// Base class for ``KafkaProducer`` and ``KafkaConsumer``, /// which is used to handle the connection to the Kafka ecosystem. @_spi(Internal) @@ -74,12 +75,14 @@ public final class RDKafkaClient: Sendable { defer { errorChars.deallocate() } let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER - guard let handle = rd_kafka_new( - clientType, - rdConfig, - errorChars, - RDKafkaClient.stringSize - ) else { + guard + let handle = rd_kafka_new( + clientType, + rdConfig, + errorChars, + RDKafkaClient.stringSize + ) + else { // rd_kafka_new only frees the rd_kafka_conf_t upon success rd_kafka_conf_destroy(rdConfig) @@ -113,7 +116,7 @@ public final class RDKafkaClient: Sendable { topic: message.topic, topicConfiguration: topicConfiguration ) { topicHandle in - return try Self.withMessageKeyAndValueBuffer(for: message) { keyBuffer, valueBuffer in + try Self.withMessageKeyAndValueBuffer(for: message) { keyBuffer, valueBuffer in if message.headers.isEmpty { // No message headers set, normal produce method can be used. rd_kafka_produce( @@ -224,12 +227,12 @@ public final class RDKafkaClient: Sendable { @discardableResult private static func withMessageKeyAndValueBuffer( for message: KafkaProducerMessage, - _ body: (UnsafeRawBufferPointer?, UnsafeRawBufferPointer) throws -> T // (keyBuffer, valueBuffer) + _ body: (UnsafeRawBufferPointer?, UnsafeRawBufferPointer) throws -> T // (keyBuffer, valueBuffer) ) rethrows -> T { - return try message.value.withUnsafeBytes { valueBuffer in + try message.value.withUnsafeBytes { valueBuffer in if let key = message.key { return try key.withUnsafeBytes { keyBuffer in - return try body(keyBuffer, valueBuffer) + try body(keyBuffer, valueBuffer) } } else { return try body(nil, valueBuffer) @@ -328,7 +331,7 @@ public final class RDKafkaClient: Sendable { // Finished reading events, return early return events default: - break // Ignored Event + break // Ignored Event } } @@ -382,29 +385,35 @@ public final class RDKafkaClient: Sendable { if let faculty, let buffer { // Mapping according to https://en.wikipedia.org/wiki/Syslog switch level { - case 0...2: /* Emergency, Alert, Critical */ + case 0...2: // Emergency, Alert, Critical self.logger.critical( - Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty) + Logger.Message(stringLiteral: String(cString: buffer)), + source: String(cString: faculty) ) - case 3: /* Error */ + case 3: // Error self.logger.error( - Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty) + Logger.Message(stringLiteral: String(cString: buffer)), + source: String(cString: faculty) ) - case 4: /* Warning */ + case 4: // Warning self.logger.warning( - Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty) + Logger.Message(stringLiteral: String(cString: buffer)), + source: String(cString: faculty) ) - case 5: /* Notice */ + case 5: // Notice self.logger.notice( - Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty) + Logger.Message(stringLiteral: String(cString: buffer)), + source: String(cString: faculty) ) - case 6: /* Informational */ + case 6: // Informational self.logger.info( - Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty) + Logger.Message(stringLiteral: String(cString: buffer)), + source: String(cString: faculty) ) - default: /* Debug */ + default: // Debug self.logger.debug( - Logger.Message(stringLiteral: String(cString: buffer)), source: String(cString: faculty) + Logger.Message(stringLiteral: String(cString: buffer)), + source: String(cString: faculty) ) } } @@ -507,10 +516,10 @@ public final class RDKafkaClient: Sendable { ) let error = changesList.withListPointer { listPointer in - return rd_kafka_commit( + rd_kafka_commit( self.kafkaHandle, listPointer, - 1 // async = true + 1 // async = true ) } @@ -604,6 +613,6 @@ public final class RDKafkaClient: Sendable { /// - Parameter body: The closure will use the Kafka handle pointer. @discardableResult func withKafkaHandlePointer(_ body: (OpaquePointer) throws -> T) rethrows -> T { - return try body(self.kafkaHandle) + try body(self.kafkaHandle) } } diff --git a/Sources/Kafka/RDKafka/RDKafkaConfig.swift b/Sources/Kafka/RDKafka/RDKafkaConfig.swift index c4933737..f8f37fac 100644 --- a/Sources/Kafka/RDKafka/RDKafkaConfig.swift +++ b/Sources/Kafka/RDKafka/RDKafkaConfig.swift @@ -23,7 +23,7 @@ struct RDKafkaConfig { /// - Throws: A ``KafkaError`` if setting a config value failed. static func createFrom(configDictionary: [String: String]) throws -> OpaquePointer { let configPointer: OpaquePointer = rd_kafka_conf_new() - try configDictionary.forEach { key, value in + for (key, value) in configDictionary { try Self.set(configPointer: configPointer, key: key, value: value) } diff --git a/Sources/Kafka/RDKafka/RDKafkaTopicConfig.swift b/Sources/Kafka/RDKafka/RDKafkaTopicConfig.swift index 100e734e..59cff496 100644 --- a/Sources/Kafka/RDKafka/RDKafkaTopicConfig.swift +++ b/Sources/Kafka/RDKafka/RDKafkaTopicConfig.swift @@ -22,7 +22,7 @@ struct RDKafkaTopicConfig { /// - Throws: A ``KafkaError`` if setting a config value failed. static func createFrom(topicConfiguration: KafkaTopicConfiguration) throws -> OpaquePointer { let configPointer: OpaquePointer = rd_kafka_topic_conf_new() - try topicConfiguration.dictionary.forEach { key, value in + for (key, value) in topicConfiguration.dictionary { try Self.set(configPointer: configPointer, key: key, value: value) } diff --git a/Sources/Kafka/RDKafka/RDKafkaTopicPartitionList.swift b/Sources/Kafka/RDKafka/RDKafkaTopicPartitionList.swift index fc663599..b02d9357 100644 --- a/Sources/Kafka/RDKafka/RDKafkaTopicPartitionList.swift +++ b/Sources/Kafka/RDKafka/RDKafkaTopicPartitionList.swift @@ -50,11 +50,13 @@ final class RDKafkaTopicPartitionList { "Partition ID outside of valid range \(0...Int32.max)" ) - guard let partitionPointer = rd_kafka_topic_partition_list_add( - self._internal, - topic, - Int32(partition.rawValue) - ) else { + guard + let partitionPointer = rd_kafka_topic_partition_list_add( + self._internal, + topic, + Int32(partition.rawValue) + ) + else { fatalError("rd_kafka_topic_partition_list_add returned invalid pointer") } partitionPointer.pointee.offset = offset @@ -65,6 +67,6 @@ final class RDKafkaTopicPartitionList { /// - Parameter body: The closure will use the pointer. @discardableResult func withListPointer(_ body: (UnsafeMutablePointer) throws -> T) rethrows -> T { - return try body(self._internal) + try body(self._internal) } } diff --git a/Sources/Kafka/Utilities/Duration+Helpers.swift b/Sources/Kafka/Utilities/Duration+Helpers.swift index d964aafc..36754b17 100644 --- a/Sources/Kafka/Utilities/Duration+Helpers.swift +++ b/Sources/Kafka/Utilities/Duration+Helpers.swift @@ -20,6 +20,6 @@ extension Duration { } internal var canBeRepresentedAsMilliseconds: Bool { - return self.inMilliseconds > 0 + self.inMilliseconds > 0 } } diff --git a/Sources/KafkaFoundationCompat/Data+KafkaContiguousBytes.swift b/Sources/KafkaFoundationCompat/Data+KafkaContiguousBytes.swift index 27455c66..e119a724 100644 --- a/Sources/KafkaFoundationCompat/Data+KafkaContiguousBytes.swift +++ b/Sources/KafkaFoundationCompat/Data+KafkaContiguousBytes.swift @@ -12,7 +12,8 @@ // //===----------------------------------------------------------------------===// -import struct Foundation.Data import Kafka +import struct Foundation.Data + extension Data: KafkaContiguousBytes {} diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index d23942c5..2777bb6c 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -13,13 +13,15 @@ //===----------------------------------------------------------------------===// import Atomics -import struct Foundation.UUID -@testable import Kafka @_spi(Internal) import Kafka import NIOCore import ServiceLifecycle import XCTest +import struct Foundation.UUID + +@testable import Kafka + // For testing locally on Mac, do the following: // // 1. Install Kafka and Zookeeper using homebrew @@ -90,15 +92,18 @@ final class KafkaTests: XCTestCase { self.uniqueTestTopic = nil } - func testProduceAndConsumeWithConsumerGroup() async throws { + func testProduceAndConsumeWithConsumerGrouptestProduceAndConsumeWithConsumerGroup() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) - let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: self.producerConfig, + logger: .kafkaTest + ) var consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .group(id: "subscription-test-group-id", topics: [self.uniqueTestTopic]), bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] ) - consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning + consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning consumerConfig.broker.addressFamily = .v4 let consumer = try KafkaConsumer( @@ -154,7 +159,10 @@ final class KafkaTests: XCTestCase { func testProduceAndConsumeWithAssignedTopicPartition() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) - let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: self.producerConfig, + logger: .kafkaTest + ) var consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .partition( @@ -164,7 +172,7 @@ final class KafkaTests: XCTestCase { ), bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] ) - consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning + consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning consumerConfig.broker.addressFamily = .v4 let consumer = try KafkaConsumer( @@ -220,14 +228,17 @@ final class KafkaTests: XCTestCase { func testProduceAndConsumeWithScheduleCommit() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) - let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: self.producerConfig, + logger: .kafkaTest + ) var consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .group(id: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]), bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] ) consumerConfig.isAutoCommitEnabled = false - consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning + consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning consumerConfig.broker.addressFamily = .v4 let consumer = try KafkaConsumer( @@ -278,14 +289,17 @@ final class KafkaTests: XCTestCase { func testProduceAndConsumeWithCommit() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) - let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: self.producerConfig, + logger: .kafkaTest + ) var consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .group(id: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]), bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] ) consumerConfig.isAutoCommitEnabled = false - consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning + consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning consumerConfig.broker.addressFamily = .v4 let consumer = try KafkaConsumer( @@ -344,14 +358,17 @@ final class KafkaTests: XCTestCase { count: 10 ) - let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: self.producerConfig, + logger: .kafkaTest + ) var consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .group(id: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]), bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] ) consumerConfig.isAutoCommitEnabled = false - consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning + consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning consumerConfig.broker.addressFamily = .v4 let consumer = try KafkaConsumer( @@ -409,7 +426,10 @@ final class KafkaTests: XCTestCase { func testNoNewConsumerMessagesAfterGracefulShutdown() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 2) - let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: self.producerConfig, + logger: .kafkaTest + ) let uniqueGroupID = UUID().uuidString @@ -420,7 +440,7 @@ final class KafkaTests: XCTestCase { ), bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] ) - consumerConfig.autoOffsetReset = .beginning // Read topic from beginning + consumerConfig.autoOffsetReset = .beginning // Read topic from beginning let consumer = try KafkaConsumer( configuration: consumerConfig, @@ -473,7 +493,10 @@ final class KafkaTests: XCTestCase { func testCommittedOffsetsAreCorrect() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) let firstConsumerOffset = testMessages.count / 2 - let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithEvents( + configuration: self.producerConfig, + logger: .kafkaTest + ) // Important: both consumer must have the same group.id let uniqueGroupID = UUID().uuidString @@ -487,7 +510,7 @@ final class KafkaTests: XCTestCase { ), bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] ) - consumer1Config.autoOffsetReset = .beginning // Read topic from beginning + consumer1Config.autoOffsetReset = .beginning // Read topic from beginning consumer1Config.broker.addressFamily = .v4 let consumer1 = try KafkaConsumer( @@ -591,8 +614,14 @@ final class KafkaTests: XCTestCase { for (index, consumedMessage) in consumedMessages.enumerated() { XCTAssertEqual(testMessages[firstConsumerOffset + index].topic, consumedMessage.topic) - XCTAssertEqual(ByteBuffer(string: testMessages[firstConsumerOffset + index].key!), consumedMessage.key) - XCTAssertEqual(ByteBuffer(string: testMessages[firstConsumerOffset + index].value), consumedMessage.value) + XCTAssertEqual( + ByteBuffer(string: testMessages[firstConsumerOffset + index].key!), + consumedMessage.key + ) + XCTAssertEqual( + ByteBuffer(string: testMessages[firstConsumerOffset + index].value), + consumedMessage.value + ) } } @@ -626,9 +655,16 @@ final class KafkaTests: XCTestCase { let numOfMessages: UInt = 1000 let testMessages = Self.createTestMessages(topic: uniqueTestTopic, count: numOfMessages) - let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: .kafkaTest) + let (producer, acks) = try KafkaProducer.makeProducerWithEvents( + configuration: producerConfig, + logger: .kafkaTest + ) - let producerServiceGroupConfiguration = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .kafkaTest) + let producerServiceGroupConfiguration = ServiceGroupConfiguration( + services: [producer], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: .kafkaTest + ) let producerServiceGroup = ServiceGroup(configuration: producerServiceGroupConfiguration) try await withThrowingTaskGroup(of: Void.self) { group in @@ -691,10 +727,18 @@ final class KafkaTests: XCTestCase { logger: .kafkaTest ) - let serviceGroupConfiguration1 = ServiceGroupConfiguration(services: [consumer1], gracefulShutdownSignals: [.sigterm, .sigint], logger: .kafkaTest) + let serviceGroupConfiguration1 = ServiceGroupConfiguration( + services: [consumer1], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: .kafkaTest + ) let serviceGroup1 = ServiceGroup(configuration: serviceGroupConfiguration1) - let serviceGroupConfiguration2 = ServiceGroupConfiguration(services: [consumer2], gracefulShutdownSignals: [.sigterm, .sigint], logger: .kafkaTest) + let serviceGroupConfiguration2 = ServiceGroupConfiguration( + services: [consumer2], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: .kafkaTest + ) let serviceGroup2 = ServiceGroup(configuration: serviceGroupConfiguration2) let sharedCtr = ManagedAtomic(0) @@ -706,7 +750,7 @@ final class KafkaTests: XCTestCase { } // Run Task for 2nd consumer group.addTask { - try await Task.sleep(for: .seconds(20)) // wait a bit that first consumer would form a queue + try await Task.sleep(for: .seconds(20)) // wait a bit that first consumer would form a queue try await serviceGroup2.run() } @@ -716,8 +760,8 @@ final class KafkaTests: XCTestCase { for try await record in consumer1.messages { sharedCtr.wrappingIncrement(ordering: .relaxed) - try consumer1.scheduleCommit(record) // commit time to time - try await Task.sleep(for: .milliseconds(100)) // don't read all messages before 2nd consumer + try consumer1.scheduleCommit(record) // commit time to time + try await Task.sleep(for: .milliseconds(100)) // don't read all messages before 2nd consumer } } @@ -727,7 +771,7 @@ final class KafkaTests: XCTestCase { for try await record in consumer2.messages { sharedCtr.wrappingIncrement(ordering: .relaxed) - try consumer2.scheduleCommit(record) // commit time to time + try consumer2.scheduleCommit(record) // commit time to time } } @@ -736,10 +780,10 @@ final class KafkaTests: XCTestCase { while true { let currentCtr = sharedCtr.load(ordering: .relaxed) guard currentCtr >= numOfMessages else { - try await Task.sleep(for: .seconds(5)) // wait if new messages come here + try await Task.sleep(for: .seconds(5)) // wait if new messages come here continue } - try await Task.sleep(for: .seconds(5)) // wait for extra messages + try await Task.sleep(for: .seconds(5)) // wait for extra messages await serviceGroup1.triggerGracefulShutdown() await serviceGroup2.triggerGracefulShutdown() break @@ -763,7 +807,7 @@ final class KafkaTests: XCTestCase { headers: [KafkaHeader] = [], count: UInt ) -> [KafkaProducerMessage] { - return _createTestMessages(topic: topic, headers: headers, count: count) + _createTestMessages(topic: topic, headers: headers, count: count) } private static func sendAndAcknowledgeMessages( @@ -772,6 +816,11 @@ final class KafkaTests: XCTestCase { messages: [KafkaProducerMessage], skipConsistencyCheck: Bool = false ) async throws { - return try await _sendAndAcknowledgeMessages(producer: producer, events: events, messages: messages, skipConsistencyCheck: skipConsistencyCheck) + try await _sendAndAcknowledgeMessages( + producer: producer, + events: events, + messages: messages, + skipConsistencyCheck: skipConsistencyCheck + ) } } diff --git a/Tests/IntegrationTests/Utilities.swift b/Tests/IntegrationTests/Utilities.swift index 94dbf374..aaa6ff3c 100644 --- a/Tests/IntegrationTests/Utilities.swift +++ b/Tests/IntegrationTests/Utilities.swift @@ -12,9 +12,10 @@ // //===----------------------------------------------------------------------===// -import struct Foundation.UUID import Logging +import struct Foundation.UUID + extension Logger { static var kafkaTest: Logger { var logger = Logger(label: "kafka.test") diff --git a/Tests/KafkaTests/KafkaConsumerTests.swift b/Tests/KafkaTests/KafkaConsumerTests.swift index 1137b0d3..2f3e59e7 100644 --- a/Tests/KafkaTests/KafkaConsumerTests.swift +++ b/Tests/KafkaTests/KafkaConsumerTests.swift @@ -12,15 +12,17 @@ // //===----------------------------------------------------------------------===// -@testable import CoreMetrics // for MetricsSystem.bootstrapInternal -import struct Foundation.UUID -@testable import Kafka import Logging import Metrics import MetricsTestKit import ServiceLifecycle import XCTest +import struct Foundation.UUID + +@testable import CoreMetrics // for MetricsSystem.bootstrapInternal +@testable import Kafka + // For testing locally on Mac, do the following: // // 1. Install Kafka and Zookeeper using homebrew @@ -82,15 +84,14 @@ final class KafkaConsumerTests: XCTestCase { let recordedEvents = recorder.recordedEvents let expectedLogs: [(level: Logger.Level, source: String, message: String)] = [ - (Logger.Level.debug, "MEMBERID", uniqueGroupID), + (Logger.Level.debug, "MEMBERID", uniqueGroupID) ] for expectedLog in expectedLogs { XCTAssertTrue( recordedEvents.contains(where: { event in - event.level == expectedLog.level && - event.source == expectedLog.source && - event.message.description.contains(expectedLog.message) + event.level == expectedLog.level && event.source == expectedLog.source + && event.message.description.contains(expectedLog.message) }), "Expected log \(expectedLog) but was not found" ) @@ -135,7 +136,7 @@ final class KafkaConsumerTests: XCTestCase { bootstrapBrokerAddresses: [] ) - _ = try KafkaConsumer(configuration: config, logger: .kafkaTest) // deinit called before run + _ = try KafkaConsumer(configuration: config, logger: .kafkaTest) // deinit called before run _ = try KafkaConsumer.makeConsumerWithEvents(configuration: config, logger: .kafkaTest) } diff --git a/Tests/KafkaTests/KafkaProducerTests.swift b/Tests/KafkaTests/KafkaProducerTests.swift index 14b83303..16d67152 100644 --- a/Tests/KafkaTests/KafkaProducerTests.swift +++ b/Tests/KafkaTests/KafkaProducerTests.swift @@ -12,8 +12,6 @@ // //===----------------------------------------------------------------------===// -@testable import CoreMetrics // for MetricsSystem.bootstrapInternal -@testable import Kafka import Logging import Metrics import MetricsTestKit @@ -21,6 +19,9 @@ import NIOCore import ServiceLifecycle import XCTest +@testable import CoreMetrics // for MetricsSystem.bootstrapInternal +@testable import Kafka + // For testing locally on Mac, do the following: // // 1. Install Kafka and Zookeeper using homebrew @@ -66,7 +67,10 @@ final class KafkaProducerTests: XCTestCase { } func testSend() async throws { - let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.config, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: self.config, + logger: .kafkaTest + ) let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer], logger: .kafkaTest) let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) @@ -95,7 +99,7 @@ final class KafkaProducerTests: XCTestCase { receivedDeliveryReports.insert(deliveryReport) } default: - break // Ignore any other events + break // Ignore any other events } if receivedDeliveryReports.count >= 1 { @@ -121,7 +125,10 @@ final class KafkaProducerTests: XCTestCase { } func testSendEmptyMessage() async throws { - let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.config, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: self.config, + logger: .kafkaTest + ) let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer], logger: .kafkaTest) let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) @@ -149,7 +156,7 @@ final class KafkaProducerTests: XCTestCase { receivedDeliveryReports.insert(deliveryReport) } default: - break // Ignore any other events + break // Ignore any other events } if receivedDeliveryReports.count >= 1 { @@ -174,7 +181,10 @@ final class KafkaProducerTests: XCTestCase { } func testSendTwoTopics() async throws { - let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.config, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: self.config, + logger: .kafkaTest + ) let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer], logger: .kafkaTest) let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) @@ -210,7 +220,7 @@ final class KafkaProducerTests: XCTestCase { receivedDeliveryReports.insert(deliveryReport) } default: - break // Ignore any other events + break // Ignore any other events } if receivedDeliveryReports.count >= 2 { @@ -270,7 +280,8 @@ final class KafkaProducerTests: XCTestCase { let recordedEvents = recorder.recordedEvents XCTAssertEqual(1, recordedEvents.count) - let expectedMessage = "[thrd:app]: No `bootstrap.servers` configured: client will not be able to connect to Kafka cluster" + let expectedMessage = + "[thrd:app]: No `bootstrap.servers` configured: client will not be able to connect to Kafka cluster" let expectedLevel = Logger.Level.notice let expectedSource = "CONFWARN" @@ -281,7 +292,10 @@ final class KafkaProducerTests: XCTestCase { } func testSendFailsAfterTerminatingAcknowledgementSequence() async throws { - let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.config, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents( + configuration: self.config, + logger: .kafkaTest + ) let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer], logger: .kafkaTest) let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) @@ -381,8 +395,11 @@ final class KafkaProducerTests: XCTestCase { func testProducerConstructDeinit() async throws { let config = KafkaProducerConfiguration(bootstrapBrokerAddresses: []) - _ = try KafkaProducer(configuration: config, logger: .kafkaTest) // deinit called before run - _ = try KafkaProducer.makeProducerWithEvents(configuration: config, logger: .kafkaTest) // deinit called before run + // deinit called before run + _ = try KafkaProducer(configuration: config, logger: .kafkaTest) + + // deinit called before run + _ = try KafkaProducer.makeProducerWithEvents(configuration: config, logger: .kafkaTest) } func testProducerEventsReadCancelledBeforeRun() async throws { diff --git a/Tests/KafkaTests/Utilities.swift b/Tests/KafkaTests/Utilities.swift index f7fbfbf8..f52a0903 100644 --- a/Tests/KafkaTests/Utilities.swift +++ b/Tests/KafkaTests/Utilities.swift @@ -66,7 +66,7 @@ internal struct MockLogHandler: LogHandler { var logLevel: Logger.Level { get { // get from config unless set - return self._logLevel ?? .debug + self._logLevel ?? .debug } set { self._logLevel = newValue @@ -82,7 +82,7 @@ internal struct MockLogHandler: LogHandler { public var metadata: Logger.Metadata { get { - return self._metadata + self._metadata } set { self._metadata = newValue @@ -91,7 +91,7 @@ internal struct MockLogHandler: LogHandler { subscript(metadataKey metadataKey: Logger.Metadata.Key) -> Logger.Metadata.Value? { get { - return self._metadata[metadataKey] + self._metadata[metadataKey] } set { self._metadata[metadataKey] = newValue diff --git a/dev/test-benchmark-thresholds.sh b/dev/test-benchmark-thresholds.sh index 731c3e97..2874d1df 100644 --- a/dev/test-benchmark-thresholds.sh +++ b/dev/test-benchmark-thresholds.sh @@ -13,7 +13,7 @@ ## ##===----------------------------------------------------------------------===## -cd Benchmarks +cd Benchmarks || exit swift package --disable-sandbox benchmark baseline update PR --no-progress git checkout main swift package --disable-sandbox benchmark baseline update main --no-progress diff --git a/dev/update-benchmark-thresholds.sh b/dev/update-benchmark-thresholds.sh index 9dc2c850..be8bf886 100755 --- a/dev/update-benchmark-thresholds.sh +++ b/dev/update-benchmark-thresholds.sh @@ -24,5 +24,5 @@ for f in 57 58 59 510 -nightly; do docker_file=$(if [[ "$f" == "-nightly" ]]; then f=main; fi && ls "$target_repo/docker/docker-compose."*"$f"*".yaml") - docker-compose -f docker/docker-compose.yaml -f $docker_file run update-benchmark-baseline + docker-compose -f docker/docker-compose.yaml -f "$docker_file" run update-benchmark-baseline done diff --git a/docker/Dockerfile b/docker/Dockerfile index 1f4de780..322781d9 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,4 +1,4 @@ -ARG swift_version=5.7 +ARG swift_version=5.9 ARG ubuntu_version=jammy ARG base_image=swift:$swift_version-$ubuntu_version FROM $base_image diff --git a/scripts/generate_contributors_list.sh b/scripts/generate_contributors_list.sh deleted file mode 100755 index b6f2405f..00000000 --- a/scripts/generate_contributors_list.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -##===----------------------------------------------------------------------===## -## -## This source file is part of the swift-kafka-client open source project -## -## Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors -## Licensed under Apache License v2.0 -## -## See LICENSE.txt for license information -## See CONTRIBUTORS.txt for the list of swift-kafka-client project authors -## -## SPDX-License-Identifier: Apache-2.0 -## -##===----------------------------------------------------------------------===## - -set -eu -here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -contributors=$( cd "$here"/.. && git shortlog -es | cut -f2 | sed 's/^/- /' ) - -cat > "$here/../CONTRIBUTORS.txt" <<- EOF - For the purpose of tracking copyright, this is the list of individuals and - organizations who have contributed source code to swift-kafka-client. - - For employees of an organization/company where the copyright of work done - by employees of that company is held by the company itself, only the company - needs to be listed here. - - ## COPYRIGHT HOLDERS - - - Apple Inc. (all contributors with '@apple.com') - - ### Contributors - - $contributors - - **Updating this list** - - Please do not edit this file manually. It is generated using \`./scripts/generate_contributors_list.sh\`. If a name is misspelled or appearing multiple times: add an entry in \`./.mailmap\` -EOF diff --git a/scripts/soundness.sh b/scripts/soundness.sh deleted file mode 100755 index e093d15b..00000000 --- a/scripts/soundness.sh +++ /dev/null @@ -1,183 +0,0 @@ -#!/bin/bash -##===----------------------------------------------------------------------===## -## -## This source file is part of the swift-kafka-client open source project -## -## Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors -## Licensed under Apache License v2.0 -## -## See LICENSE.txt for license information -## See CONTRIBUTORS.txt for the list of swift-kafka-client project authors -## -## SPDX-License-Identifier: Apache-2.0 -## -##===----------------------------------------------------------------------===## - -set -eu -here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" - -function replace_acceptable_years() { - # this needs to replace all acceptable forms with 'YEARS' - sed -e 's/20[12][789012]-20[12][890123]/YEARS/' -e 's/20[12][890123]/YEARS/' -} - -printf "=> Checking for unacceptable language... " -# This greps for unacceptable terminology. The square bracket[s] are so that -# "git grep" doesn't find the lines that greps :). -unacceptable_terms=( - -e blacklis[t] - -e whitelis[t] - -e slav[e] - -e sanit[y] -) - -# We have to exclude the code of conduct because it gives examples of unacceptable language. -# We have to exclude *Config.swift files as they need to map to Kafka terminology -# which is considered unacceptable by us. -exclude_files=( - CODE_OF_CONDUCT.md - *Configuration.swift -) -for word in "${exclude_files[@]}"; do - exclude_files+=(":(exclude)$word") -done -exclude_files_str=$(printf " %s" "${exclude_files[@]}") - -if git grep --color=never -i "${unacceptable_terms[@]}" -- . $exclude_files_str > /dev/null; then - printf "\033[0;31mUnacceptable language found.\033[0m\n" - git grep -i "${unacceptable_terms[@]}" -- . $exclude_files_str - exit 1 -fi -printf "\033[0;32mokay.\033[0m\n" - -printf "=> Checking format... " -FIRST_OUT="$(git status --porcelain)" -swiftformat . > /dev/null 2>&1 -SECOND_OUT="$(git status --porcelain)" -if [[ "$FIRST_OUT" != "$SECOND_OUT" ]]; then - printf "\033[0;31mformatting issues!\033[0m\n" - git --no-pager diff - exit 1 -else - printf "\033[0;32mokay.\033[0m\n" -fi - -printf "=> Checking license headers... " -tmp=$(mktemp /tmp/.swift-kafka-client-soundness_XXXXXX) - -for language in swift-or-c bash dtrace python; do - declare -a matching_files - declare -a exceptions - expections=( ) - matching_files=( -name '*' ) - case "$language" in - swift-or-c) - # we don't add our own headers to the librdkafka submodule - exceptions=( -path '*Sources/Crdkafka/*' -o -name Package.swift ) - matching_files=( -name '*.swift' -o -name '*.c' -o -name '*.h' ) - cat > "$tmp" <<"EOF" -//===----------------------------------------------------------------------===// -// -// This source file is part of the swift-kafka-client open source project -// -// Copyright (c) YEARS Apple Inc. and the swift-kafka-client project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// -EOF - ;; - bash) - matching_files=( -name '*.sh' ) - cat > "$tmp" <<"EOF" -#!/bin/bash -##===----------------------------------------------------------------------===## -## -## This source file is part of the swift-kafka-client open source project -## -## Copyright (c) YEARS Apple Inc. and the swift-kafka-client project authors -## Licensed under Apache License v2.0 -## -## See LICENSE.txt for license information -## See CONTRIBUTORS.txt for the list of swift-kafka-client project authors -## -## SPDX-License-Identifier: Apache-2.0 -## -##===----------------------------------------------------------------------===## -EOF - ;; - python) - matching_files=( -name '*.py' ) - cat > "$tmp" <<"EOF" -#!/usr/bin/env python3 -##===----------------------------------------------------------------------===## -## -## This source file is part of the swift-kafka-client open source project -## -## Copyright (c) YEARS Apple Inc. and the swift-kafka-client project authors -## Licensed under Apache License v2.0 -## -## See LICENSE.txt for license information -## See CONTRIBUTORS.txt for the list of swift-kafka-client project authors -## -## SPDX-License-Identifier: Apache-2.0 -## -##===----------------------------------------------------------------------===## -EOF - ;; - dtrace) - matching_files=( -name '*.d' ) - cat > "$tmp" <<"EOF" -#!/usr/sbin/dtrace -q -s -/*===----------------------------------------------------------------------===* - * - * This source file is part of the swift-kafka-client open source project - * - * Copyright (c) YEARS Apple Inc. and the swift-kafka-client project authors - * Licensed under Apache License v2.0 - * - * See LICENSE.txt for license information - * See CONTRIBUTORS.txt for the list of swift-kafka-client project authors - * - * SPDX-License-Identifier: Apache-2.0 - * - *===----------------------------------------------------------------------===*/ -EOF - ;; - *) - echo >&2 "ERROR: unknown language '$language'" - ;; - esac - - expected_lines=$(cat "$tmp" | wc -l) - expected_sha=$(cat "$tmp" | shasum) - - ( - cd "$here/.." - { - find . \ - \( \! -path './.build/*' -a \ - \( "${matching_files[@]}" \) -a \ - \( \! \( "${exceptions[@]}" \) \) \) - - if [[ "$language" = bash ]]; then - # add everything with a shell shebang too - git grep --full-name -l '#!/bin/bash' - git grep --full-name -l '#!/bin/sh' - fi - } | while read line; do - if [[ "$(cat "$line" | replace_acceptable_years | head -n $expected_lines | shasum)" != "$expected_sha" ]]; then - printf "\033[0;31mmissing headers in file '$line'!\033[0m\n" - diff -u <(cat "$line" | replace_acceptable_years | head -n $expected_lines) "$tmp" - exit 1 - fi - done - printf "\033[0;32mokay.\033[0m\n" - ) -done - -rm "$tmp"