From e73abfdc52fda410c2512c0a35c4830b067a14a2 Mon Sep 17 00:00:00 2001 From: Eric Rosenberg Date: Fri, 13 Dec 2024 14:53:21 +0000 Subject: [PATCH 1/4] Responsiveness under Working Conditions Implementation of https://datatracker.ietf.org/doc/draft-ietf-ippm-responsiveness/ (draft 5) with flexible download and upload handlers to suit other use cases as well. --- .spi.yml | 2 +- Package.swift | 42 ++++ README.md | 2 + .../HTTPDrippingDownloadHandler.swift | 228 ++++++++++++++++++ .../HTTPReceiveDiscardHandler.swift | 90 +++++++ .../ResponsivenessConfig.swift | 45 ++++ .../SimpleResponsivenessRequestMux.swift | 141 +++++++++++ .../HTTPResponsivenessServer.swift | 67 +++++ .../HTTPDrippingDownloadHandlerTests.swift | 133 ++++++++++ .../HTTPResponsivenessTests.swift | 172 +++++++++++++ 10 files changed, 921 insertions(+), 1 deletion(-) create mode 100644 Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift create mode 100644 Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift create mode 100644 Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift create mode 100644 Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift create mode 100644 Sources/NIOHTTPResponsivenessServer/HTTPResponsivenessServer.swift create mode 100644 Tests/NIOHTTPResponsivenessTests/HTTPDrippingDownloadHandlerTests.swift create mode 100644 Tests/NIOHTTPResponsivenessTests/HTTPResponsivenessTests.swift diff --git a/.spi.yml b/.spi.yml index 38cb5aa9..4ef114be 100644 --- a/.spi.yml +++ b/.spi.yml @@ -1,4 +1,4 @@ version: 1 builder: configs: - - documentation_targets: [NIOExtras, NIOHTTPCompression, NIOSOCKS, NIOHTTPTypes, NIOHTTPTypesHTTP1, NIOHTTPTypesHTTP2, NIOResumableUpload] + - documentation_targets: [NIOExtras, NIOHTTPCompression, NIOSOCKS, NIOHTTPTypes, NIOHTTPTypesHTTP1, NIOHTTPTypesHTTP2, NIOResumableUpload, NIOHTTPResponsiveness] diff --git a/Package.swift b/Package.swift index 72ffc7e5..1452bc42 100644 --- a/Package.swift +++ b/Package.swift @@ -197,6 +197,44 @@ var targets: [PackageDescription.Target] = [ .product(name: "NIOEmbedded", package: "swift-nio"), ] ), + .target( + name: "NIOHTTPResponsiveness", + dependencies: [ + "NIOHTTPTypes", + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "HTTPTypes", package: "swift-http-types"), + ], + swiftSettings: [ + .enableExperimentalFeature("StrictConcurrency") + ] + ), + .testTarget( + name: "NIOHTTPResponsivenessTests", + dependencies: [ + "NIOHTTPResponsiveness", + "NIOHTTPTypes", + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOEmbedded", package: "swift-nio"), + .product(name: "HTTPTypes", package: "swift-http-types"), + ], + swiftSettings: [ + .enableExperimentalFeature("StrictConcurrency") + ] + ), + .executableTarget( + name: "NIOHTTPResponsivenessServer", + dependencies: [ + "NIOHTTPResponsiveness", + "NIOHTTPTypesHTTP1", + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOPosix", package: "swift-nio"), + .product(name: "NIOHTTP1", package: "swift-nio"), + .product(name: "ArgumentParser", package: "swift-argument-parser"), + ], + swiftSettings: [ + .enableExperimentalFeature("StrictConcurrency") + ] + ), ] let package = Package( @@ -209,6 +247,7 @@ let package = Package( .library(name: "NIOHTTPTypesHTTP1", targets: ["NIOHTTPTypesHTTP1"]), .library(name: "NIOHTTPTypesHTTP2", targets: ["NIOHTTPTypesHTTP2"]), .library(name: "NIOResumableUpload", targets: ["NIOResumableUpload"]), + .library(name: "NIOHTTPResponsiveness", targets: ["NIOHTTPResponsiveness"]), ], dependencies: [ .package(url: "https://github.com/apple/swift-nio.git", from: "2.67.0"), @@ -216,6 +255,9 @@ let package = Package( .package(url: "https://github.com/apple/swift-http-types.git", from: "1.3.0"), .package(url: "https://github.com/apple/swift-http-structured-headers.git", from: "1.1.0"), .package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0"), + .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.27.0"), + .package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.4.0"), + ], targets: targets ) diff --git a/README.md b/README.md index a400b17b..9ea0ed5f 100644 --- a/README.md +++ b/README.md @@ -59,3 +59,5 @@ On the [`nio-extras-0.1`](https://github.com/apple/swift-nio-extras/tree/nio-ext - [`HTTP2FramePayloadToHTTPClientCodec`](Sources/NIOHTTPTypesHTTP2/HTTP2ToHTTPCodec.swift) A `ChannelHandler` that translates HTTP/2 concepts into shared HTTP types for the client side. - [`HTTP2FramePayloadToHTTPServerCodec`](Sources/NIOHTTPTypesHTTP2/HTTP2ToHTTPCodec.swift) A `ChannelHandler` that translates HTTP/2 concepts into shared HTTP types for the server side. - [`HTTPResumableUploadHandler`](Sources/NIOResumableUpload/HTTPResumableUploadHandler.swift) A `ChannelHandler` that translates HTTP resumable uploads to regular uploads. +- [`HTTPDrippingDownloadHandler`](Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift) A `ChannelHandler` that sends a configurable stream of zeroes to a client. +- [`HTTPReceiveDiscardHandler`](Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift) A `ChannelHandler` that receives arbitrary bytes from a client and discards them. diff --git a/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift b/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift new file mode 100644 index 00000000..29a8a576 --- /dev/null +++ b/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift @@ -0,0 +1,228 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOHTTPTypes + +/// HTTP request handler sending a configurable stream of zeroes +public final class HTTPDrippingDownloadHandler: ChannelDuplexHandler { + public typealias InboundIn = HTTPRequestPart + public typealias OutboundOut = HTTPResponsePart + public typealias OutboundIn = Never + + // Predefine buffer to reuse over and over again when sending chunks to requester. NIO allows + // us to give it reference counted buffers. Reusing like this allows us to avoid allocations. + static let downloadBodyChunk = ByteBuffer(repeating: 0, count: 65536) + + private var frequency: TimeAmount + private var size: Int + private var count: Int + private var delay: TimeAmount + private var code: HTTPResponse.Status + + private enum Phase { + /// We haven't gotten the request head - nothing to respond to + case waitingOnHead + /// We got the request head and are delaying the response + case delayingResponse + /// We're dripping response chunks to the peer, tracking how many chunks we have left + case dripping(DrippingState) + /// We either sent everything to the client or the request ended short + case done + } + + private struct DrippingState { + var chunksLeft: Int + var currentChunkBytesLeft: Int + } + + private var phase = Phase.waitingOnHead + private var scheduled: Scheduled? + private var pendingRead = false + private var pendingWrite = false + private var activelyWritingChunk = false + + /// Initializes an `HTTPDrippingDownloadHandler`. + /// - Parameters: + /// - count: How many chunks should be sent. Note that the underlying HTTP + /// stack may split or combine these chunks into data frames as + /// they see fit. + /// - size: How large each chunk should be + /// - frequency: How much time to wait between sending each chunk + /// - delay: How much time to wait before sending the first chunk + /// - code: What HTTP status code to send + public init( + count: Int = 0, + size: Int = 0, + frequency: TimeAmount = .zero, + delay: TimeAmount = .zero, + code: HTTPResponse.Status = .ok + ) { + self.frequency = frequency + self.size = size + self.count = count + self.delay = delay + self.code = code + } + + public convenience init?(queryArgsString: Substring.UTF8View) { + self.init() + + let pairs = queryArgsString.split(separator: UInt8(ascii: "&")) + for pair in pairs { + var pairParts = pair.split(separator: UInt8(ascii: "="), maxSplits: 1).makeIterator() + guard let first = pairParts.next(), let second = pairParts.next() else { + continue + } + + guard let secondNum = Int(Substring(second)) else { + return nil + } + + switch Substring(first) { + case "frequency": + self.frequency = .seconds(Int64(secondNum)) + case "size": + self.size = secondNum + case "count": + self.count = secondNum + case "delay": + self.delay = .seconds(Int64(secondNum)) + case "code": + self.code = HTTPResponse.Status(code: secondNum) + default: + continue + } + } + } + + public func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let part = self.unwrapInboundIn(data) + + switch part { + case .head: + self.phase = .delayingResponse + + if self.delay == TimeAmount.zero { + // If no delay, we might as well start responding immediately + self.onResponseDelayCompleted(context: context) + } else { + let this = NIOLoopBound(self, eventLoop: context.eventLoop) + let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop) + self.scheduled = context.eventLoop.scheduleTask(in: self.delay) { + this.value.onResponseDelayCompleted(context: loopBoundContext.value) + } + } + case .body, .end: + return + } + } + + private func onResponseDelayCompleted(context: ChannelHandlerContext) { + guard case .delayingResponse = self.phase else { + return + } + + var head = HTTPResponse(status: self.code) + + // If the length isn't too big, let's include a content length header + if case (let contentLength, false) = self.size.multipliedReportingOverflow(by: self.count) { + head.headerFields = HTTPFields(dictionaryLiteral: (.contentLength, "\(contentLength)")) + } + + context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil) + self.phase = .dripping( + DrippingState( + chunksLeft: self.count, + currentChunkBytesLeft: self.size + ) + ) + + self.writeChunk(context: context) + } + + public func channelInactive(context: ChannelHandlerContext) { + self.phase = .done + self.scheduled?.cancel() + } + + public func channelWritabilityChanged(context: ChannelHandlerContext) { + if case .dripping = self.phase, self.pendingWrite && context.channel.isWritable { + self.writeChunk(context: context) + } + } + + private func writeChunk(context: ChannelHandlerContext) { + // Make sure we don't accidentally reenter + if self.activelyWritingChunk { + return + } + self.activelyWritingChunk = true + defer { + self.activelyWritingChunk = false + } + + // If we're not dripping the response body, there's nothing to do + guard case .dripping(var drippingState) = self.phase else { + return + } + + // If we've sent all chunks, send end and be done + if drippingState.chunksLeft < 1 { + self.phase = .done + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + return + } + + var dataWritten = false + while drippingState.currentChunkBytesLeft > 0, context.channel.isWritable { + let toSend = min(drippingState.currentChunkBytesLeft, HTTPDrippingDownloadHandler.downloadBodyChunk.readableBytes) + let buffer = HTTPDrippingDownloadHandler.downloadBodyChunk.getSlice( + at: HTTPDrippingDownloadHandler.downloadBodyChunk.readerIndex, + length: toSend + )! + context.write(self.wrapOutboundOut(.body(buffer)), promise: nil) + drippingState.currentChunkBytesLeft -= toSend + dataWritten = true + } + if dataWritten { + context.flush() + } + + // If we weren't able to send the full chunk, it's because the channel isn't writable. We yield until it is + if drippingState.currentChunkBytesLeft > 0 { + self.pendingWrite = true + self.phase = .dripping(drippingState) + return + } + + // We sent the full chunk. If we have no more chunks to write, we're done! + drippingState.chunksLeft -= 1 + if drippingState.chunksLeft == 0 { + self.phase = .done + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + return + } + + // More chunks to write.. Kick off timer + drippingState.currentChunkBytesLeft = self.size + self.phase = .dripping(drippingState) + let this = NIOLoopBound(self, eventLoop: context.eventLoop) + let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop) + self.scheduled = context.eventLoop.scheduleTask(in: self.frequency) { + this.value.writeChunk(context: loopBoundContext.value) + } + } +} diff --git a/Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift b/Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift new file mode 100644 index 00000000..0463fe73 --- /dev/null +++ b/Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift @@ -0,0 +1,90 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOHTTPTypes + +/// HTTP request handler that receives arbitrary bytes and discards them +public final class HTTPReceiveDiscardHandler: ChannelInboundHandler { + + public typealias InboundIn = HTTPRequestPart + public typealias OutboundOut = HTTPResponsePart + + private let expectation: Int? + private var expectationViolated = false + private var received = 0 + + /// Initializes `HTTPReceiveDiscardHandler` + /// - Parameter expectation: how many bytes should be expected. If more + /// bytes are received than expected, an error status code will + /// be sent to the client + public init(expectation: Int?) { + self.expectation = expectation + } + + public func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let part = self.unwrapInboundIn(data) + + switch part { + case .head: + return + case .body(let buffer): + self.received += buffer.readableBytes + + // If the expectation is violated, send 4xx + if let expectation = self.expectation, self.received > expectation { + self.onExpectationViolated(context: context, expectation: expectation) + } + case .end: + if self.expectationViolated { + // Already flushed a response, nothing else to do + return + } + + if let expectation = self.expectation, self.received != expectation { + self.onExpectationViolated(context: context, expectation: expectation) + return + } + + let responseBody = ByteBuffer(string: "Received \(self.received) bytes") + self.writeSimpleResponse(context: context, status: .ok, body: responseBody) + } + } + + private func onExpectationViolated(context: ChannelHandlerContext, expectation: Int) { + self.expectationViolated = true + + let body = ByteBuffer( + string: + "Received in excess of expectation; expected(\(expectation)) received(\(self.received))" + ) + self.writeSimpleResponse(context: context, status: .ok, body: body) + } + + private func writeSimpleResponse( + context: ChannelHandlerContext, + status: HTTPResponse.Status, + body: ByteBuffer + ) { + let bodyLen = body.readableBytes + let responseHead = HTTPResponse( + status: status, + headerFields: HTTPFields(dictionaryLiteral: (.contentLength, "\(bodyLen)")) + ) + context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil) + context.write(self.wrapOutboundOut(.body(body)), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + } +} diff --git a/Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift b/Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift new file mode 100644 index 00000000..c8f266c7 --- /dev/null +++ b/Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift @@ -0,0 +1,45 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +public struct ResponsivenessConfig: Encodable { + public var version: Int + public var urls: ResponsivenessConfigURLs + + public init(version: Int, urls: ResponsivenessConfigURLs) { + self.version = version + self.urls = urls + } +} + +public struct ResponsivenessConfigURLs: Encodable { + public var largeDownloadURL: String + public var smallDownloadURL: String + public var uploadURL: String + + enum CodingKeys: String, CodingKey { + case largeDownloadURL = "large_download_url" + case smallDownloadURL = "small_download_url" + case uploadURL = "upload_url" + } + + static var largeDownloadSize: Int { 8 * 1_000_000_000 } // 8 * 10^9 + static var smallDownloadSize: Int { 1 } + + public init(scheme: String, authority: String) { + let base = "\(scheme)://\(authority)/responsiveness" + self.largeDownloadURL = "\(base)/download/\(ResponsivenessConfigURLs.largeDownloadSize)" + self.smallDownloadURL = "\(base)/download/\(ResponsivenessConfigURLs.smallDownloadSize)" + self.uploadURL = "\(base)/upload" + } +} diff --git a/Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift b/Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift new file mode 100644 index 00000000..b93b411a --- /dev/null +++ b/Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift @@ -0,0 +1,141 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOHTTPTypes + +/// Basic request multiplexer that identifies which request type (config, download, upload) is requested and adds the appropriate handler. +/// Once the handler has been added, all data is passed through to the newly added handler. +public final class SimpleResponsivenessRequestMux: ChannelInboundHandler { + + public typealias InboundIn = HTTPRequestPart + public typealias OutboundOut = HTTPResponsePart + + // Predefine some common things we'll need in responses + private static let notFoundBody = ByteBuffer(string: "Not Found") + + // Whether or not we added a handler after us + private var handlerAdded = false + + // Config returned to user that lists responsiveness endpoints + private let responsivenessConfigBuffer: ByteBuffer + + public init(responsivenessConfigBuffer: ByteBuffer) { + self.responsivenessConfigBuffer = responsivenessConfigBuffer + } + + public func channelRead(context: ChannelHandlerContext, data: NIOAny) { + if case let .head(request) = self.unwrapInboundIn(data) { + // TODO: get rid of this altogether and instead create an empty iterator below + guard let path = request.path else { + self.writeSimpleResponse( + context: context, + status: .notFound, + body: SimpleResponsivenessRequestMux.notFoundBody + ) + return + } + + var pathComponents = path.utf8.split(separator: "?".utf8, maxSplits: 1).makeIterator() + let firstPathComponent = pathComponents.next()! + let queryArgsString = pathComponents.next() + + // Split the path into components + var uriComponentIterator = firstPathComponent.split( + separator: UInt8(ascii: "/"), + maxSplits: 3, + omittingEmptySubsequences: false + ).lazy.map(Substring.init).makeIterator() + + // Handle possible path components + switch ( + request.method, uriComponentIterator.next(), uriComponentIterator.next(), + uriComponentIterator.next(), uriComponentIterator.next().flatMap { Int($0) } + ) { + case (.get, .some(""), .some("responsiveness"), .none, .none): + self.writeSimpleResponse( + context: context, + status: .ok, + body: self.responsivenessConfigBuffer + ) + case (.get, .some(""), .some("responsiveness"), .some("download"), .some(let count)): + self.addHandlerOrInternalError(context: context, handler: HTTPDrippingDownloadHandler(count: 1, size: count)) + case (.post, .some(""), .some("responsiveness"), .some("upload"), .none): + // Check if we should expect a certain count + var expectation: Int? + if let lengthHeaderValue = request.headerFields[.contentLength] { + if let expectedLength = Int(lengthHeaderValue) { + expectation = expectedLength + } + } + self.addHandlerOrInternalError(context: context, handler: HTTPReceiveDiscardHandler(expectation: expectation)) + case (_, .some(""), .some("drip"), .none, .none): + if let queryArgsString = queryArgsString { + guard let handler = HTTPDrippingDownloadHandler(queryArgsString: queryArgsString) else { + self.writeSimpleResponse(context: context, status: .badRequest, body: .init()) + return + } + self.addHandlerOrInternalError(context: context, handler: handler) + } else { + self.addHandlerOrInternalError(context: context, handler: HTTPDrippingDownloadHandler()) + } + default: + self.writeSimpleResponse( + context: context, + status: .notFound, + body: SimpleResponsivenessRequestMux.notFoundBody + ) + } + } + + // Only pass through data through if we've actually added a handler. If we didn't add a handler, it's because we + // directly responded. In this case, we don't care about the rest of the request. + if self.handlerAdded { + context.fireChannelRead(data) + } + } + + /// Adding handlers is fallible. If we fail to do it, we should return 500 to the user + private func addHandlerOrInternalError(context: ChannelHandlerContext, handler: ChannelHandler) { + do { + try context.pipeline.syncOperations.addHandler( + handler, + position: ChannelPipeline.Position.after(self) + ) + self.handlerAdded = true + } catch { + self.writeSimpleResponse( + context: context, + status: .internalServerError, + body: ByteBuffer.init() + ) + } + } + + private func writeSimpleResponse( + context: ChannelHandlerContext, + status: HTTPResponse.Status, + body: ByteBuffer + ) { + let bodyLen = body.readableBytes + let responseHead = HTTPResponse( + status: status, + headerFields: HTTPFields(dictionaryLiteral: (.contentLength, "\(bodyLen)")) + ) + context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil) + context.write(self.wrapOutboundOut(.body(body)), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + } +} diff --git a/Sources/NIOHTTPResponsivenessServer/HTTPResponsivenessServer.swift b/Sources/NIOHTTPResponsivenessServer/HTTPResponsivenessServer.swift new file mode 100644 index 00000000..5f8333af --- /dev/null +++ b/Sources/NIOHTTPResponsivenessServer/HTTPResponsivenessServer.swift @@ -0,0 +1,67 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import ArgumentParser +import NIOHTTPResponsiveness +import NIOCore +import NIOHTTPTypesHTTP1 +import NIOPosix +import FoundationEssentials + +func responsivenessConfigBuffer(scheme: String, host: String, port: Int) throws -> ByteBuffer { + let cfg = ResponsivenessConfig( + version: 1, + urls: ResponsivenessConfigURLs(scheme: scheme, authority: "\(host):\(port)") + ) + let encoded = try JSONEncoder().encode(cfg) + return ByteBuffer(bytes: encoded) +} + +@main +private struct HTTPResponsivenessServer: ParsableCommand { + @Option(help: "Which host to bind to") + var host: String + + @Option(help: "Which port to bind to") + var port: Int + + func run() throws { + let config = try responsivenessConfigBuffer(scheme: "http", host: host, port: port) + + let socketBootstrap = ServerBootstrap(group: MultiThreadedEventLoopGroup.singleton) + // Specify backlog and enable SO_REUSEADDR for the server itself + .serverChannelOption(ChannelOptions.backlog, value: 256) + .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + + // Set the handlers that are applied to the accepted Channels + .childChannelInitializer({ channel in + channel.pipeline.configureHTTPServerPipeline().flatMapThrowing { + let mux = SimpleResponsivenessRequestMux(responsivenessConfigBuffer: config) + return try channel.pipeline.syncOperations.addHandlers([ + HTTP1ToHTTPServerCodec(secure: false), + mux, + ]) + } + }) + + // Enable SO_REUSEADDR for the accepted Channels + .childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + .childChannelOption(ChannelOptions.tcpOption(.tcp_nodelay), value: 1) + + let insecureChannel = try socketBootstrap.bind(host: host, port: port).wait() + print("Listening on http://\(host):\(port)") + + let _ = try insecureChannel.closeFuture.wait() + } +} diff --git a/Tests/NIOHTTPResponsivenessTests/HTTPDrippingDownloadHandlerTests.swift b/Tests/NIOHTTPResponsivenessTests/HTTPDrippingDownloadHandlerTests.swift new file mode 100644 index 00000000..bff2386d --- /dev/null +++ b/Tests/NIOHTTPResponsivenessTests/HTTPDrippingDownloadHandlerTests.swift @@ -0,0 +1,133 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOEmbedded +import NIOHTTPTypes +import XCTest + +@testable import NIOHTTPResponsiveness + +final class HTTPDrippingDownloadHandlerTests: XCTestCase { + + func testDefault() throws { + let eventLoop = EmbeddedEventLoop() + let channel = EmbeddedChannel(handler: HTTPDrippingDownloadHandler(), loop: eventLoop) + + try channel.writeInbound( + HTTPRequestPart.head(HTTPRequest(method: .get, scheme: "http", authority: "whatever", path: "/drip")) + ) + + eventLoop.run() + + guard case let HTTPResponsePart.head(head) = (try channel.readOutbound())! else { + XCTFail("expected response head") + return + } + XCTAssertEqual(head.status, .ok) + + guard case HTTPResponsePart.end(nil) = (try channel.readOutbound())! else { + XCTFail("expected response end") + return + } + + let _ = try channel.finish() + } + + func testBasic() throws { + try dripTest(count: 2, size: 1024) + } + + func testZeroChunks() throws { + try dripTest(count: 0) + } + + func testNonZeroStatusCode() throws { + try dripTest(count: 1, code: .notAcceptable) + } + + func testZeroChunkSize() throws { + try dripTest(count: 1, size: 0) + } + + func dripTest( + count: Int, + size: Int = 1024, + frequency: TimeAmount = .seconds(1), + delay: TimeAmount = .seconds(5), + code: HTTPResponse.Status = .ok + ) throws { + let eventLoop = EmbeddedEventLoop() + let channel = EmbeddedChannel( + handler: HTTPDrippingDownloadHandler( + count: count, + size: size, + frequency: frequency, + delay: delay, + code: code + ), + loop: eventLoop + ) + + try channel.writeInbound( + HTTPRequestPart.head(HTTPRequest(method: .get, scheme: "http", authority: "whatever", path: nil)) + ) + + // Make sure delay is honored + eventLoop.run() + XCTAssert(try channel.readOutbound() == nil) + + eventLoop.advanceTime(by: delay + .milliseconds(100)) + + guard case let HTTPResponsePart.head(head) = (try channel.readOutbound())! else { + XCTFail("expected response head") + return + } + XCTAssertEqual(head.status, code) + + var chunksReceived = 0 + while chunksReceived < count { + + // Shouldn't need to wait for the first chunk + if chunksReceived > 0 { + eventLoop.advanceTime(by: frequency + .milliseconds(100)) + } + + var chunkBytesReceived = 0 + while chunkBytesReceived < size { + let next: HTTPResponsePart? = try channel.readOutbound() + guard case let .body(dataChunk) = next! else { + XCTFail("expected response data") + return + } + chunkBytesReceived += dataChunk.readableBytes + } + chunksReceived += 1 + + if chunksReceived < count { + let part: HTTPResponsePart? = try channel.readOutbound() + XCTAssert(part == nil) + } + } + + guard case HTTPResponsePart.end(nil) = (try channel.readOutbound())! else { + XCTFail("expected response end") + return + } + + let _ = try channel.finish() + } + +} diff --git a/Tests/NIOHTTPResponsivenessTests/HTTPResponsivenessTests.swift b/Tests/NIOHTTPResponsivenessTests/HTTPResponsivenessTests.swift new file mode 100644 index 00000000..151a23a2 --- /dev/null +++ b/Tests/NIOHTTPResponsivenessTests/HTTPResponsivenessTests.swift @@ -0,0 +1,172 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import HTTPTypes +import NIOCore +import NIOEmbedded +import NIOHTTPTypes +import XCTest + +@testable import NIOHTTPResponsiveness + +final class NIOHTTPResponsivenessTests: XCTestCase { + func download(channel: EmbeddedChannel, n: Int) throws { + // Recv head + try channel.writeInbound( + HTTPRequestPart.head( + HTTPRequest( + method: .get, + scheme: "http", + authority: "localhost:8888", + path: "/responsiveness/download/\(n)" + ) + ) + ) + + // Should get response head with content length + let out: HTTPResponsePart = (try channel.readOutbound())! + guard case let HTTPResponsePart.head(head) = out else { + XCTFail() + return + } + XCTAssertEqual(Int(head.headerFields[.contentLength]!)!, n) + + // Drain response body until completed + var received = 0 + loop: while true { + let out: HTTPResponsePart = (try channel.readOutbound())! + switch out { + case .head: + XCTFail("cannot get head twice") + case .body(let body): + received += body.readableBytes + case .end: + break loop + } + } + XCTAssertEqual(received, n) + + } + + func upload(channel: EmbeddedChannel, length: Int, includeContentLength: Bool) throws { + var head = HTTPRequest( + method: .post, + scheme: "http", + authority: "localhost:8888", + path: "/responsiveness/upload" + ) + if includeContentLength { + head.headerFields[.contentLength] = "\(length)" + } + + // Recv head + try channel.writeInbound(HTTPRequestPart.head(head)) + + // Shouldn't get any immediate response + let out: HTTPResponsePart? = try channel.readOutbound() + XCTAssertNil(out) + + // Send data + var sent = 0 + while sent < length { + let toWrite = min(length - sent, HTTPDrippingDownloadHandler.downloadBodyChunk.readableBytes) + let buf = HTTPDrippingDownloadHandler.downloadBodyChunk.getSlice( + at: HTTPDrippingDownloadHandler.downloadBodyChunk.readerIndex, + length: toWrite + )! + try channel.writeInbound(HTTPRequestPart.body(buf)) + sent += toWrite + } + + // Send fin + try channel.writeInbound(HTTPRequestPart.end(nil)) + + // Get response from server + var part: HTTPResponsePart = (try channel.readOutbound())! + guard case let HTTPResponsePart.head(head) = part else { + XCTFail("expected response head") + return + } + XCTAssertEqual(head.status, .ok) + + // Check response body to confirm server received everything we sent + part = (try channel.readOutbound())! + guard case let HTTPResponsePart.body(body) = part else { + XCTFail("expected response body") + return + } + XCTAssertEqual(String(buffer: body), "Received \(length) bytes") + + // Check server correctly closes the stream + part = (try channel.readOutbound())! + guard case HTTPResponsePart.end(nil) = part else { + XCTFail("expected end") + return + } + } + + private static let defaultValues = [0, 1, 2, 10, 1000, 20000] + + func testDownload() throws { + for val in NIOHTTPResponsivenessTests.defaultValues { + let channel = EmbeddedChannel(handler: HTTPDrippingDownloadHandler(count: 1, size: val)) + try download(channel: channel, n: val) + let _ = try channel.finish() + } + } + + func testUpload() throws { + for val in NIOHTTPResponsivenessTests.defaultValues { + var channel = EmbeddedChannel(handler: HTTPReceiveDiscardHandler(expectation: val)) + try upload(channel: channel, length: val, includeContentLength: true) + let _ = try channel.finish() + + channel = EmbeddedChannel(handler: HTTPReceiveDiscardHandler(expectation: nil)) + try upload(channel: channel, length: val, includeContentLength: false) + let _ = try channel.finish() + } + } + + func testMuxDownload() throws { + for val in NIOHTTPResponsivenessTests.defaultValues { + let channel = EmbeddedChannel( + handler: SimpleResponsivenessRequestMux( + responsivenessConfigBuffer: ByteBuffer(string: "test") + ) + ) + try download(channel: channel, n: val) + let _ = try channel.finish() + } + } + + func testMuxUpload() throws { + for val in NIOHTTPResponsivenessTests.defaultValues { + var channel = EmbeddedChannel( + handler: SimpleResponsivenessRequestMux( + responsivenessConfigBuffer: ByteBuffer(string: "test") + ) + ) + try upload(channel: channel, length: val, includeContentLength: true) + let _ = try channel.finish() + + channel = EmbeddedChannel( + handler: SimpleResponsivenessRequestMux( + responsivenessConfigBuffer: ByteBuffer(string: "test") + ) + ) + try upload(channel: channel, length: val, includeContentLength: false) + let _ = try channel.finish() + } + } +} From f75a34ff6bdcc0e966eac43e73f2b37523bb2130 Mon Sep 17 00:00:00 2001 From: Eric Rosenberg Date: Thu, 19 Dec 2024 14:52:14 +0000 Subject: [PATCH 2/4] feedback --- .../NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift | 2 +- Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift | 4 ++-- .../HTTPResponsivenessServer.swift | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift b/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift index 29a8a576..4ca0dc3c 100644 --- a/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift +++ b/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift @@ -16,7 +16,7 @@ import HTTPTypes import NIOCore import NIOHTTPTypes -/// HTTP request handler sending a configurable stream of zeroes +/// HTTP request handler sending a configurable stream of zeroes. Uses HTTPTypes request/response parts. public final class HTTPDrippingDownloadHandler: ChannelDuplexHandler { public typealias InboundIn = HTTPRequestPart public typealias OutboundOut = HTTPResponsePart diff --git a/Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift b/Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift index c8f266c7..4eb4512a 100644 --- a/Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift +++ b/Sources/NIOHTTPResponsiveness/ResponsivenessConfig.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// -public struct ResponsivenessConfig: Encodable { +public struct ResponsivenessConfig: Codable, Hashable, Sendable { public var version: Int public var urls: ResponsivenessConfigURLs @@ -22,7 +22,7 @@ public struct ResponsivenessConfig: Encodable { } } -public struct ResponsivenessConfigURLs: Encodable { +public struct ResponsivenessConfigURLs: Codable, Hashable, Sendable { public var largeDownloadURL: String public var smallDownloadURL: String public var uploadURL: String diff --git a/Sources/NIOHTTPResponsivenessServer/HTTPResponsivenessServer.swift b/Sources/NIOHTTPResponsivenessServer/HTTPResponsivenessServer.swift index 5f8333af..8c21deff 100644 --- a/Sources/NIOHTTPResponsivenessServer/HTTPResponsivenessServer.swift +++ b/Sources/NIOHTTPResponsivenessServer/HTTPResponsivenessServer.swift @@ -15,9 +15,10 @@ import ArgumentParser import NIOHTTPResponsiveness import NIOCore +import NIOHTTP1 import NIOHTTPTypesHTTP1 import NIOPosix -import FoundationEssentials +import Foundation func responsivenessConfigBuffer(scheme: String, host: String, port: Int) throws -> ByteBuffer { let cfg = ResponsivenessConfig( From a95faa1676b1b3b3466e2e9b55ba906b087c7ee4 Mon Sep 17 00:00:00 2001 From: Eric Rosenberg Date: Fri, 20 Dec 2024 16:49:28 +0000 Subject: [PATCH 3/4] feedback --- Package.swift | 2 +- .../HTTPDrippingDownloadHandler.swift | 22 +++++++++++++++---- .../HTTPReceiveDiscardHandler.swift | 2 +- .../SimpleResponsivenessRequestMux.swift | 4 ++-- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/Package.swift b/Package.swift index 1452bc42..ca3e8102 100644 --- a/Package.swift +++ b/Package.swift @@ -250,7 +250,7 @@ let package = Package( .library(name: "NIOHTTPResponsiveness", targets: ["NIOHTTPResponsiveness"]), ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", from: "2.67.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.77.0"), .package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.27.0"), .package(url: "https://github.com/apple/swift-http-types.git", from: "1.3.0"), .package(url: "https://github.com/apple/swift-http-structured-headers.git", from: "1.1.0"), diff --git a/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift b/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift index 4ca0dc3c..7c45ae04 100644 --- a/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift +++ b/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift @@ -50,6 +50,7 @@ public final class HTTPDrippingDownloadHandler: ChannelDuplexHandler { private var phase = Phase.waitingOnHead private var scheduled: Scheduled? + private var scheduledCallbackHandler: HTTPDrippingDownloadHandlerScheduledCallbackHandler? private var pendingRead = false private var pendingWrite = false private var activelyWritingChunk = false @@ -156,6 +157,7 @@ public final class HTTPDrippingDownloadHandler: ChannelDuplexHandler { public func channelInactive(context: ChannelHandlerContext) { self.phase = .done self.scheduled?.cancel() + context.fireChannelInactive() } public func channelWritabilityChanged(context: ChannelHandlerContext) { @@ -219,10 +221,22 @@ public final class HTTPDrippingDownloadHandler: ChannelDuplexHandler { // More chunks to write.. Kick off timer drippingState.currentChunkBytesLeft = self.size self.phase = .dripping(drippingState) - let this = NIOLoopBound(self, eventLoop: context.eventLoop) - let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop) - self.scheduled = context.eventLoop.scheduleTask(in: self.frequency) { - this.value.writeChunk(context: loopBoundContext.value) + if self.scheduledCallbackHandler == nil { + let this = NIOLoopBound(self, eventLoop: context.eventLoop) + let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop) + self.scheduledCallbackHandler = HTTPDrippingDownloadHandlerScheduledCallbackHandler(handler: this, context: loopBoundContext) + } + // SAFTEY: scheduling the callback only potentially throws when invoked off eventloop + try! context.eventLoop.scheduleCallback(in: self.frequency, handler: self.scheduledCallbackHandler!) + } + + private struct HTTPDrippingDownloadHandlerScheduledCallbackHandler: NIOScheduledCallbackHandler & Sendable { + var handler: NIOLoopBound + var context: NIOLoopBound + + func handleScheduledCallback(eventLoop: some EventLoop) { + self.handler.value.writeChunk(context: self.context.value) } } } + diff --git a/Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift b/Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift index 0463fe73..8fba40d4 100644 --- a/Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift +++ b/Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift @@ -70,7 +70,7 @@ public final class HTTPReceiveDiscardHandler: ChannelInboundHandler { string: "Received in excess of expectation; expected(\(expectation)) received(\(self.received))" ) - self.writeSimpleResponse(context: context, status: .ok, body: body) + self.writeSimpleResponse(context: context, status: .badRequest, body: body) } private func writeSimpleResponse( diff --git a/Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift b/Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift index b93b411a..b3ecb94e 100644 --- a/Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift +++ b/Sources/NIOHTTPResponsiveness/SimpleResponsivenessRequestMux.swift @@ -70,8 +70,8 @@ public final class SimpleResponsivenessRequestMux: ChannelInboundHandler { status: .ok, body: self.responsivenessConfigBuffer ) - case (.get, .some(""), .some("responsiveness"), .some("download"), .some(let count)): - self.addHandlerOrInternalError(context: context, handler: HTTPDrippingDownloadHandler(count: 1, size: count)) + case (.get, .some(""), .some("responsiveness"), .some("download"), .some(let size)): + self.addHandlerOrInternalError(context: context, handler: HTTPDrippingDownloadHandler(count: 1, size: size)) case (.post, .some(""), .some("responsiveness"), .some("upload"), .none): // Check if we should expect a certain count var expectation: Int? From 8df3653a5c5797de1b29a647a0c96559e648bd44 Mon Sep 17 00:00:00 2001 From: Eric Rosenberg Date: Fri, 20 Dec 2024 16:54:39 +0000 Subject: [PATCH 4/4] avoid unnecessary flush --- .../HTTPDrippingDownloadHandler.swift | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift b/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift index 7c45ae04..8dc73c49 100644 --- a/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift +++ b/Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift @@ -199,14 +199,14 @@ public final class HTTPDrippingDownloadHandler: ChannelDuplexHandler { drippingState.currentChunkBytesLeft -= toSend dataWritten = true } - if dataWritten { - context.flush() - } // If we weren't able to send the full chunk, it's because the channel isn't writable. We yield until it is if drippingState.currentChunkBytesLeft > 0 { self.pendingWrite = true self.phase = .dripping(drippingState) + if dataWritten { + context.flush() + } return } @@ -218,6 +218,10 @@ public final class HTTPDrippingDownloadHandler: ChannelDuplexHandler { return } + if dataWritten { + context.flush() + } + // More chunks to write.. Kick off timer drippingState.currentChunkBytesLeft = self.size self.phase = .dripping(drippingState)