Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Responsiveness under Working Conditions #242

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .spi.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 1
builder:
configs:
- documentation_targets: [NIOExtras, NIOHTTPCompression, NIOSOCKS, NIOHTTPTypes, NIOHTTPTypesHTTP1, NIOHTTPTypesHTTP2, NIOResumableUpload]
- documentation_targets: [NIOExtras, NIOHTTPCompression, NIOSOCKS, NIOHTTPTypes, NIOHTTPTypesHTTP1, NIOHTTPTypesHTTP2, NIOResumableUpload, NIOHTTPResponsiveness]
42 changes: 42 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,44 @@ var targets: [PackageDescription.Target] = [
.product(name: "NIOEmbedded", package: "swift-nio"),
]
),
.target(
name: "NIOHTTPResponsiveness",
dependencies: [
"NIOHTTPTypes",
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "HTTPTypes", package: "swift-http-types"),
],
swiftSettings: [
.enableExperimentalFeature("StrictConcurrency")
]
),
.testTarget(
name: "NIOHTTPResponsivenessTests",
dependencies: [
"NIOHTTPResponsiveness",
"NIOHTTPTypes",
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "HTTPTypes", package: "swift-http-types"),
],
swiftSettings: [
.enableExperimentalFeature("StrictConcurrency")
]
),
.executableTarget(
name: "NIOHTTPResponsivenessServer",
dependencies: [
"NIOHTTPResponsiveness",
"NIOHTTPTypesHTTP1",
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"),
.product(name: "NIOHTTP1", package: "swift-nio"),
.product(name: "ArgumentParser", package: "swift-argument-parser"),
],
swiftSettings: [
.enableExperimentalFeature("StrictConcurrency")
]
),
]

let package = Package(
Expand All @@ -209,13 +247,17 @@ let package = Package(
.library(name: "NIOHTTPTypesHTTP1", targets: ["NIOHTTPTypesHTTP1"]),
.library(name: "NIOHTTPTypesHTTP2", targets: ["NIOHTTPTypesHTTP2"]),
.library(name: "NIOResumableUpload", targets: ["NIOResumableUpload"]),
.library(name: "NIOHTTPResponsiveness", targets: ["NIOHTTPResponsiveness"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.67.0"),
.package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.27.0"),
.package(url: "https://github.com/apple/swift-http-types.git", from: "1.3.0"),
.package(url: "https://github.com/apple/swift-http-structured-headers.git", from: "1.1.0"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.27.0"),
.package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.4.0"),

],
targets: targets
)
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ On the [`nio-extras-0.1`](https://github.com/apple/swift-nio-extras/tree/nio-ext
- [`HTTP2FramePayloadToHTTPClientCodec`](Sources/NIOHTTPTypesHTTP2/HTTP2ToHTTPCodec.swift) A `ChannelHandler` that translates HTTP/2 concepts into shared HTTP types for the client side.
- [`HTTP2FramePayloadToHTTPServerCodec`](Sources/NIOHTTPTypesHTTP2/HTTP2ToHTTPCodec.swift) A `ChannelHandler` that translates HTTP/2 concepts into shared HTTP types for the server side.
- [`HTTPResumableUploadHandler`](Sources/NIOResumableUpload/HTTPResumableUploadHandler.swift) A `ChannelHandler` that translates HTTP resumable uploads to regular uploads.
- [`HTTPDrippingDownloadHandler`](Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift) A `ChannelHandler` that sends a configurable stream of zeroes to a client.
- [`HTTPReceiveDiscardHandler`](Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift) A `ChannelHandler` that receives arbitrary bytes from a client and discards them.
228 changes: 228 additions & 0 deletions Sources/NIOHTTPResponsiveness/HTTPDrippingDownloadHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HTTPTypes
import NIOCore
import NIOHTTPTypes

/// HTTP request handler sending a configurable stream of zeroes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document that this requires the use of the HTTPTypes types, not the regular NIO ones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I suggest that, for now, we don't make the two "helper" handlers public? It's generally wise for us not to commit to too much API without a clear use-case. Making something public that was previously internal is generally easy, changing something that we accidentally made public is much harder.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two "helper" handlers you're referring to are the HTTPDrippingDownloadHandler and the HTTPReceiveDiscardHandler, right? The idea being only the ResponsivenessConfig and SimpleResponsivenessRequestMux end up being public?

I think doing that would end up making this more of an example than a building block. I find that request multiplexers are typically not very reusable (hence the prefix "Simple"). My expectation is that someone wanting to put together a robust implementation would have their own request mux and add things like limits on download size, delay durations, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair enough.

public final class HTTPDrippingDownloadHandler: ChannelDuplexHandler {
public typealias InboundIn = HTTPRequestPart
public typealias OutboundOut = HTTPResponsePart
public typealias OutboundIn = Never

// Predefine buffer to reuse over and over again when sending chunks to requester. NIO allows
// us to give it reference counted buffers. Reusing like this allows us to avoid allocations.
static let downloadBodyChunk = ByteBuffer(repeating: 0, count: 65536)

private var frequency: TimeAmount
private var size: Int
private var count: Int
private var delay: TimeAmount
private var code: HTTPResponse.Status

private enum Phase {
/// We haven't gotten the request head - nothing to respond to
case waitingOnHead
/// We got the request head and are delaying the response
case delayingResponse
/// We're dripping response chunks to the peer, tracking how many chunks we have left
case dripping(DrippingState)
/// We either sent everything to the client or the request ended short
case done
}

private struct DrippingState {
var chunksLeft: Int
var currentChunkBytesLeft: Int
}

private var phase = Phase.waitingOnHead
private var scheduled: Scheduled<Void>?
private var pendingRead = false
private var pendingWrite = false
private var activelyWritingChunk = false

/// Initializes an `HTTPDrippingDownloadHandler`.
/// - Parameters:
/// - count: How many chunks should be sent. Note that the underlying HTTP
/// stack may split or combine these chunks into data frames as
/// they see fit.
/// - size: How large each chunk should be
/// - frequency: How much time to wait between sending each chunk
/// - delay: How much time to wait before sending the first chunk
/// - code: What HTTP status code to send
public init(
count: Int = 0,
size: Int = 0,
frequency: TimeAmount = .zero,
delay: TimeAmount = .zero,
code: HTTPResponse.Status = .ok
) {
self.frequency = frequency
self.size = size
self.count = count
self.delay = delay
self.code = code
}

public convenience init?(queryArgsString: Substring.UTF8View) {
self.init()

let pairs = queryArgsString.split(separator: UInt8(ascii: "&"))
for pair in pairs {
var pairParts = pair.split(separator: UInt8(ascii: "="), maxSplits: 1).makeIterator()
guard let first = pairParts.next(), let second = pairParts.next() else {
continue
}

guard let secondNum = Int(Substring(second)) else {
return nil
}

switch Substring(first) {
case "frequency":
self.frequency = .seconds(Int64(secondNum))
case "size":
self.size = secondNum
case "count":
self.count = secondNum
case "delay":
self.delay = .seconds(Int64(secondNum))
case "code":
self.code = HTTPResponse.Status(code: secondNum)
default:
continue
}
}
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let part = self.unwrapInboundIn(data)

switch part {
case .head:
self.phase = .delayingResponse

if self.delay == TimeAmount.zero {
// If no delay, we might as well start responding immediately
self.onResponseDelayCompleted(context: context)
} else {
let this = NIOLoopBound(self, eventLoop: context.eventLoop)
let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop)
self.scheduled = context.eventLoop.scheduleTask(in: self.delay) {
this.value.onResponseDelayCompleted(context: loopBoundContext.value)
}
}
case .body, .end:
return
}
}

private func onResponseDelayCompleted(context: ChannelHandlerContext) {
guard case .delayingResponse = self.phase else {
return
}

var head = HTTPResponse(status: self.code)

// If the length isn't too big, let's include a content length header
if case (let contentLength, false) = self.size.multipliedReportingOverflow(by: self.count) {
head.headerFields = HTTPFields(dictionaryLiteral: (.contentLength, "\(contentLength)"))
}

context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
self.phase = .dripping(
DrippingState(
chunksLeft: self.count,
currentChunkBytesLeft: self.size
)
)

self.writeChunk(context: context)
}

public func channelInactive(context: ChannelHandlerContext) {
self.phase = .done
self.scheduled?.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should forward the channelInactive event as well

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

}

public func channelWritabilityChanged(context: ChannelHandlerContext) {
if case .dripping = self.phase, self.pendingWrite && context.channel.isWritable {
self.writeChunk(context: context)
}
}

private func writeChunk(context: ChannelHandlerContext) {
// Make sure we don't accidentally reenter
if self.activelyWritingChunk {
return
}
self.activelyWritingChunk = true
defer {
self.activelyWritingChunk = false
}

// If we're not dripping the response body, there's nothing to do
guard case .dripping(var drippingState) = self.phase else {
return
}

// If we've sent all chunks, send end and be done
if drippingState.chunksLeft < 1 {
self.phase = .done
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
return
}

var dataWritten = false
while drippingState.currentChunkBytesLeft > 0, context.channel.isWritable {
let toSend = min(drippingState.currentChunkBytesLeft, HTTPDrippingDownloadHandler.downloadBodyChunk.readableBytes)
let buffer = HTTPDrippingDownloadHandler.downloadBodyChunk.getSlice(
at: HTTPDrippingDownloadHandler.downloadBodyChunk.readerIndex,
length: toSend
)!
context.write(self.wrapOutboundOut(.body(buffer)), promise: nil)
drippingState.currentChunkBytesLeft -= toSend
dataWritten = true
}
if dataWritten {
context.flush()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we delay this slightly to avoid flushing twice in case we're done? (i.e. here and after writing end)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to be careful here and put an extra if dataWritten inside one of the early returns, but the new code does avoid the unnecessary flush


// If we weren't able to send the full chunk, it's because the channel isn't writable. We yield until it is
if drippingState.currentChunkBytesLeft > 0 {
self.pendingWrite = true
self.phase = .dripping(drippingState)
return
}

// We sent the full chunk. If we have no more chunks to write, we're done!
drippingState.chunksLeft -= 1
if drippingState.chunksLeft == 0 {
self.phase = .done
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
return
}

// More chunks to write.. Kick off timer
drippingState.currentChunkBytesLeft = self.size
self.phase = .dripping(drippingState)
let this = NIOLoopBound(self, eventLoop: context.eventLoop)
let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop)
self.scheduled = context.eventLoop.scheduleTask(in: self.frequency) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A recent-ish NIO got a scheduleCallback API which is slightly cheaper than scheduleTask: https://github.com/apple/swift-nio/blob/56f9b7c6fc9525ba36236dbb151344f8c35288df/Sources/NIOCore/EventLoop.swift#L378-L381

Might be worth using that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I think I used it mostly correctly, but I'm not 100% sure. I haven't built much intuition around when swift does or doesn't allocate. I did find the fact that scheduleCallback throws a bit surprising and looked for an equivalent syncOperations-like method to avoid the try, but I didn't find one. I think try! may be fine in practice given current implementations, but not entirely correct. Interested in your feedback here.

this.value.writeChunk(context: loopBoundContext.value)
}
}
}
90 changes: 90 additions & 0 deletions Sources/NIOHTTPResponsiveness/HTTPReceiveDiscardHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HTTPTypes
import NIOCore
import NIOHTTPTypes

/// HTTP request handler that receives arbitrary bytes and discards them
public final class HTTPReceiveDiscardHandler: ChannelInboundHandler {

public typealias InboundIn = HTTPRequestPart
public typealias OutboundOut = HTTPResponsePart

private let expectation: Int?
private var expectationViolated = false
private var received = 0

/// Initializes `HTTPReceiveDiscardHandler`
/// - Parameter expectation: how many bytes should be expected. If more
/// bytes are received than expected, an error status code will
/// be sent to the client
public init(expectation: Int?) {
self.expectation = expectation
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let part = self.unwrapInboundIn(data)

switch part {
case .head:
return
case .body(let buffer):
self.received += buffer.readableBytes

// If the expectation is violated, send 4xx
if let expectation = self.expectation, self.received > expectation {
self.onExpectationViolated(context: context, expectation: expectation)
}
case .end:
if self.expectationViolated {
// Already flushed a response, nothing else to do
return
}

if let expectation = self.expectation, self.received != expectation {
self.onExpectationViolated(context: context, expectation: expectation)
return
}

let responseBody = ByteBuffer(string: "Received \(self.received) bytes")
self.writeSimpleResponse(context: context, status: .ok, body: responseBody)
}
}

private func onExpectationViolated(context: ChannelHandlerContext, expectation: Int) {
self.expectationViolated = true

let body = ByteBuffer(
string:
"Received in excess of expectation; expected(\(expectation)) received(\(self.received))"
)
self.writeSimpleResponse(context: context, status: .ok, body: body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L46 says the status should be a 4xx status code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes! Fixed, thanks

}

private func writeSimpleResponse(
context: ChannelHandlerContext,
status: HTTPResponse.Status,
body: ByteBuffer
) {
let bodyLen = body.readableBytes
let responseHead = HTTPResponse(
status: status,
headerFields: HTTPFields(dictionaryLiteral: (.contentLength, "\(bodyLen)"))
)
context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
context.write(self.wrapOutboundOut(.body(body)), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
}
}
Loading
Loading