Skip to content

Commit

Permalink
Fixes LISTEN to quote channel name (#466)
Browse files Browse the repository at this point in the history
Co-authored-by: Fabian Fett <[email protected]>
  • Loading branch information
NeedleInAJayStack and fabianfett authored Mar 26, 2024
1 parent 8f8724e commit 35587e9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
4 changes: 2 additions & 2 deletions Sources/PostgresNIO/New/PostgresChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
private func makeStartListeningQuery(channel: String, context: ChannelHandlerContext) -> PSQLTask {
let promise = context.eventLoop.makePromise(of: PSQLRowStream.self)
let query = ExtendedQueryContext(
query: PostgresQuery(unsafeSQL: "LISTEN \(channel);"),
query: PostgresQuery(unsafeSQL: #"LISTEN "\#(channel)";"#),
logger: self.logger,
promise: promise
)
Expand Down Expand Up @@ -642,7 +642,7 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
private func makeUnlistenQuery(channel: String, context: ChannelHandlerContext) -> PSQLTask {
let promise = context.eventLoop.makePromise(of: PSQLRowStream.self)
let query = ExtendedQueryContext(
query: PostgresQuery(unsafeSQL: "UNLISTEN \(channel);"),
query: PostgresQuery(unsafeSQL: #"UNLISTEN "\#(channel)";"#),
logger: self.logger,
promise: promise
)
Expand Down
29 changes: 18 additions & 11 deletions Tests/IntegrationTests/AsyncTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -225,25 +225,32 @@ final class AsyncPostgresConnectionTests: XCTestCase {
}

func testListenAndNotify() async throws {
let channelNames = [
"foo",
"default"
]

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()

try await self.withTestConnection(on: eventLoop) { connection in
let stream = try await connection.listen("foo")
var iterator = stream.makeAsyncIterator()
for channelName in channelNames {
try await self.withTestConnection(on: eventLoop) { connection in
let stream = try await connection.listen(channelName)
var iterator = stream.makeAsyncIterator()

try await self.withTestConnection(on: eventLoop) { other in
try await other.query(#"NOTIFY foo, 'bar';"#, logger: .psqlTest)
try await self.withTestConnection(on: eventLoop) { other in
try await other.query(#"NOTIFY "\#(unescaped: channelName)", 'bar';"#, logger: .psqlTest)

try await other.query(#"NOTIFY foo, 'foo';"#, logger: .psqlTest)
}
try await other.query(#"NOTIFY "\#(unescaped: channelName)", 'foo';"#, logger: .psqlTest)
}

let first = try await iterator.next()
XCTAssertEqual(first?.payload, "bar")
let first = try await iterator.next()
XCTAssertEqual(first?.payload, "bar")

let second = try await iterator.next()
XCTAssertEqual(second?.payload, "foo")
let second = try await iterator.next()
XCTAssertEqual(second?.payload, "foo")
}
}
}

Expand Down
10 changes: 5 additions & 5 deletions Tests/PostgresNIOTests/New/PostgresConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PostgresConnectionTests: XCTestCase {
}

let listenMessage = try await channel.waitForUnpreparedRequest()
XCTAssertEqual(listenMessage.parse.query, "LISTEN foo;")
XCTAssertEqual(listenMessage.parse.query, #"LISTEN "foo";"#)

try await channel.writeInbound(PostgresBackendMessage.parseComplete)
try await channel.writeInbound(PostgresBackendMessage.parameterDescription(.init(dataTypes: [])))
Expand All @@ -63,7 +63,7 @@ class PostgresConnectionTests: XCTestCase {
try await channel.writeInbound(PostgresBackendMessage.notification(.init(backendPID: 12, channel: "foo", payload: "wooohooo")))

let unlistenMessage = try await channel.waitForUnpreparedRequest()
XCTAssertEqual(unlistenMessage.parse.query, "UNLISTEN foo;")
XCTAssertEqual(unlistenMessage.parse.query, #"UNLISTEN "foo";"#)

try await channel.writeInbound(PostgresBackendMessage.parseComplete)
try await channel.writeInbound(PostgresBackendMessage.parameterDescription(.init(dataTypes: [])))
Expand Down Expand Up @@ -111,7 +111,7 @@ class PostgresConnectionTests: XCTestCase {
}

let listenMessage = try await channel.waitForUnpreparedRequest()
XCTAssertEqual(listenMessage.parse.query, "LISTEN foo;")
XCTAssertEqual(listenMessage.parse.query, #"LISTEN "foo";"#)

try await channel.writeInbound(PostgresBackendMessage.parseComplete)
try await channel.writeInbound(PostgresBackendMessage.parameterDescription(.init(dataTypes: [])))
Expand All @@ -124,7 +124,7 @@ class PostgresConnectionTests: XCTestCase {
try await channel.writeInbound(PostgresBackendMessage.notification(.init(backendPID: 12, channel: "foo", payload: "wooohooo2")))

let unlistenMessage = try await channel.waitForUnpreparedRequest()
XCTAssertEqual(unlistenMessage.parse.query, "UNLISTEN foo;")
XCTAssertEqual(unlistenMessage.parse.query, #"UNLISTEN "foo";"#)

try await channel.writeInbound(PostgresBackendMessage.parseComplete)
try await channel.writeInbound(PostgresBackendMessage.parameterDescription(.init(dataTypes: [])))
Expand Down Expand Up @@ -160,7 +160,7 @@ class PostgresConnectionTests: XCTestCase {
}

let listenMessage = try await channel.waitForUnpreparedRequest()
XCTAssertEqual(listenMessage.parse.query, "LISTEN foo;")
XCTAssertEqual(listenMessage.parse.query, #"LISTEN "foo";"#)

try await channel.writeInbound(PostgresBackendMessage.parseComplete)
try await channel.writeInbound(PostgresBackendMessage.parameterDescription(.init(dataTypes: [])))
Expand Down

0 comments on commit 35587e9

Please sign in to comment.