Skip to content

Commit

Permalink
Get pubsub numsub working
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterAdams-A authored and Mordil committed May 4, 2021
1 parent e08b426 commit 3ca471b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
12 changes: 7 additions & 5 deletions Sources/RediStack/Commands/PubSubCommands.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ extension RedisCommand {
/// [PUBSUB NUMSUB](https://redis.io/commands/pubsub#codepubsub-numsub-channel-1--channel-ncode)
/// - Parameter channels: A list of channel names to collect the subscriber counts for.
public static func pubsubNumsub(forChannels channels: [RedisChannelName]) -> RedisCommand<[RedisChannelName: Int]> {
let args = channels.map { $0.convertedToRESPValue() }
return .init(keyword: "PUBSUB NUMSUB", arguments: args) {
var args: [RESPValue] = [.init(bulk: "NUMSUB")]
args.append(convertingContentsOf: channels)
return .init(keyword: "PUBSUB", arguments: args) {
let response = try $0.map(to: [RESPValue].self)
assert(response.count == channels.count * 2, "Unexpected response size!")

Expand All @@ -62,11 +63,12 @@ extension RedisCommand {
return try channels
.enumerated()
.reduce(into: [:]) { (result, next) in
assert(next.element.rawValue == response[next.offset].string, "Unexpected value in current index!")
let responseOffset = next.offset * 2
assert(next.element.rawValue == response[responseOffset].string, "Unexpected value in current index!")

guard let count = response[next.offset + 1].int else {
guard let count = response[responseOffset + 1].int else {
throw RedisClientError.assertionFailure(
message: "Unexpected value at position \(next.offset + 1) in \(response)"
message: "Unexpected value at position \(responseOffset + 1) in \(response)"
)
}
result[next.element] = count
Expand Down
36 changes: 36 additions & 0 deletions Tests/RediStackIntegrationTests/Commands/PubSubCommandsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,42 @@ final class RedisPubSubCommandsTests: RediStackIntegrationTestCase {
let allChannels = try queryConnection.send(.pubsubChannels()).wait()
XCTAssertGreaterThanOrEqual(allChannels.count, channelNames.count)
}

func test_pubSubNumsub() throws {
let fn = #function
let subscriber = try self.makeNewConnection()
defer { try? subscriber.close().wait() }

let channelNames = (1...5).map {
RedisChannelName("\(fn)\($0)")
}

for channelName in channelNames {
try subscriber.subscribe(
to: channelName,
messageReceiver: { _, _ in },
onSubscribe: nil,
onUnsubscribe: nil
).wait()
}
XCTAssertTrue(subscriber.isSubscribed)
defer {
// Unsubscribe (clean up)
try? subscriber.unsubscribe(from: channelNames).wait()
XCTAssertFalse(subscriber.isSubscribed)
}

// Make another connection to query on.
let queryConnection = try self.makeNewConnection()
defer { try? queryConnection.close().wait() }

let notSubscribedChannel = RedisChannelName("\(fn)_notsubbed")
let numSubs = try queryConnection.send(.pubsubNumsub(forChannels: [channelNames[0], notSubscribedChannel])).wait()
XCTAssertEqual(numSubs.count, 2)

XCTAssertGreaterThanOrEqual(numSubs[channelNames[0]] ?? 0, 1)
XCTAssertEqual(numSubs[notSubscribedChannel], 0)
}
}

final class RedisPubSubCommandsPoolTests: RediStackConnectionPoolIntegrationTestCase {
Expand Down

0 comments on commit 3ca471b

Please sign in to comment.