Skip to content

Commit

Permalink
Introduction of isConnected Publisher (#47)
Browse files Browse the repository at this point in the history
* Add isConnected publisher
  • Loading branch information
sashkopotapov authored Dec 4, 2023
1 parent dc57d71 commit 0e7be7b
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 45 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:

steps:
- uses: actions/checkout@v3
- name: Select Xcode 14
run: sudo xcode-select -s /Applications/Xcode_14.3.1.app
- name: Select Xcode 15
run: sudo xcode-select -s /Applications/Xcode_15.0.app
- name: Test
run: swift test
2 changes: 1 addition & 1 deletion .swift-version
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
5.8.0
5.9.0

8 changes: 4 additions & 4 deletions Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/shareup/async-extensions.git",
"state" : {
"revision" : "f8dec7a227bbbe15fd4df90c787d4c73e91451ba",
"version" : "4.2.1"
"revision" : "7e727e3b9009a5de429393691f9f499aedb7a109",
"version" : "4.3.0"
}
},
{
Expand Down Expand Up @@ -50,8 +50,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/shareup/websocket-apple.git",
"state" : {
"revision" : "f409dca92eb61be6dd1183941a8590f9c4459f07",
"version" : "4.0.2"
"revision" : "32de0d7733ded57f1b052ab5dddf36708b0ab1dc",
"version" : "4.0.3"
}
}
],
Expand Down
6 changes: 3 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.8
// swift-tools-version:5.9
import PackageDescription

let package = Package(
Expand All @@ -12,7 +12,7 @@ let package = Package(
dependencies: [
.package(
url: "https://github.com/shareup/async-extensions.git",
from: "4.1.0"
from: "4.3.0"
),
.package(
url: "https://github.com/shareup/dispatch-timer.git",
Expand All @@ -32,7 +32,7 @@ let package = Package(
),
.package(
url: "https://github.com/shareup/websocket-apple.git",
from: "4.0.2"
from: "4.0.3"
),
],
targets: [
Expand Down
14 changes: 7 additions & 7 deletions Sources/Phoenix/Event.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ public enum Event: Hashable, ExpressibleByStringLiteral, Sendable {
public var stringValue: String {
switch self {
case .join:
return "phx_join"
"phx_join"
case .leave:
return "phx_leave"
"phx_leave"
case .close:
return "phx_close"
"phx_close"
case .reply:
return "phx_reply"
"phx_reply"
case .error:
return "phx_error"
"phx_error"
case .heartbeat:
return "heartbeat"
"heartbeat"
case let .custom(string):
return string
string
}
}

Expand Down
30 changes: 16 additions & 14 deletions Sources/Phoenix/PhoenixChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ final class PhoenixChannel: @unchecked Sendable {
let topic: Topic
let joinPayload: JSON

var messages: AsyncStream<Message> { messageSubject.allValues }
var messages: AsyncStream<Message> {
messageSubject.allAsyncValues
}

var isErrored: Bool { state.access { $0.isErrored } }
var isJoining: Bool { state.access { $0.isJoining } }
Expand Down Expand Up @@ -167,7 +169,7 @@ final class PhoenixChannel: @unchecked Sendable {
private extension PhoenixChannel {
func watchSocketConnection() {
tasks.storedTask(key: "socketConnection") { [socket, state, weak self] in
let connectionStates = socket.onConnectionStateChange.allValues
let connectionStates = socket.onConnectionStateChange.allAsyncValues
for await connectionState in connectionStates {
try Task.checkCancellation()

Expand Down Expand Up @@ -254,13 +256,13 @@ private struct State: @unchecked Sendable {
var joinRef: Ref? {
switch connection {
case .unjoined, .errored, .joining, .left:
return nil
nil

case let .joined(ref, _):
return ref
ref

case let .leaving(ref, _):
return ref
ref
}
}

Expand Down Expand Up @@ -294,12 +296,12 @@ private struct State: @unchecked Sendable {

var description: String {
switch self {
case .errored: return "errored"
case .joined: return "joined"
case .joining: return "joining"
case .leaving: return "leaving"
case .left: return "left"
case .unjoined: return "unjoined"
case .errored: "errored"
case .joined: "joined"
case .joining: "joining"
case .leaving: "leaving"
case .left: "left"
case .unjoined: "unjoined"
}
}
}
Expand Down Expand Up @@ -495,9 +497,9 @@ private struct State: @unchecked Sendable {
switch connection {
case let .joined(joinRef, _):
if message.joinRef == joinRef || message.joinRef == nil {
return { sendMessage(message) }
{ sendMessage(message) }
} else {
return {
{
os_log(
"outdated message: channel=%{public}s joinRef=%d message=%d",
log: .phoenix,
Expand All @@ -510,7 +512,7 @@ private struct State: @unchecked Sendable {
}

case .unjoined, .errored, .joining, .leaving, .left:
return { sendMessage(message) }
{ sendMessage(message) }
}
}

Expand Down
28 changes: 18 additions & 10 deletions Sources/Phoenix/PhoenixSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ final actor PhoenixSocket {
var webSocket: WebSocket? {
switch _connectionState.value {
case let .connecting(ws), let .open(ws), let .closing(ws):
return ws
ws

case .waitingToReconnect, .preparingToReconnect, .closed:
return nil
nil
}
}

Expand All @@ -37,7 +37,10 @@ final actor PhoenixSocket {
nonisolated let pushEncoder: PushEncoder
nonisolated let messageDecoder: MessageDecoder

nonisolated var messages: AsyncStream<Message> { messageSubject.allValues }
nonisolated var messages: AsyncStream<Message> {
messageSubject.allAsyncValues
}

private nonisolated let messageSubject = PassthroughSubject<Message, Never>()

private nonisolated let currentWebSocketID = Locked(0)
Expand All @@ -51,6 +54,12 @@ final actor PhoenixSocket {
_connectionState.eraseToAnyPublisher()
}

nonisolated var isConnected: AnyPublisher<Bool, Never> {
_connectionState
.map(\.isOpen)
.eraseToAnyPublisher()
}

private nonisolated let _connectionState =
CurrentValueSubject<
PhoenixSocket.ConnectionState,
Expand Down Expand Up @@ -230,7 +239,6 @@ extension PhoenixSocket {
guard let self else { return }

do {

let message = try decoder(msg)

_ = pushes.didReceive(message)
Expand Down Expand Up @@ -368,12 +376,12 @@ extension PhoenixSocket {

var description: String {
switch self {
case .closed: return "closed"
case .waitingToReconnect: return "waitingToReconnect"
case .preparingToReconnect: return "preparingToReconnect"
case .connecting: return "connecting"
case .open: return "open"
case .closing: return "closing"
case .closed: "closed"
case .waitingToReconnect: "waitingToReconnect"
case .preparingToReconnect: "preparingToReconnect"
case .connecting: "connecting"
case .open: "open"
case .closing: "closing"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Phoenix/PushBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Sendable {
if cancellation.isCancelled {
throw CancellationError()
} else {
return try state.next(cont)
try state.next(cont)
}
}

Expand Down
5 changes: 5 additions & 0 deletions Sources/Phoenix/Socket.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Combine
import Foundation
import JSON
import Synchronized
Expand All @@ -7,6 +8,7 @@ public struct Socket: Identifiable, Sendable {
public let id: Int
public var connect: @Sendable () async -> Void
public var disconnect: @Sendable () async -> Void
public var isConnected: @Sendable () async -> AnyPublisher<Bool, Never>
public var channel: @Sendable (Topic, JSON) async -> Channel
public var remove: @Sendable (Topic) async -> Void
public var removeAll: @Sendable () async -> Void
Expand All @@ -18,6 +20,7 @@ public struct Socket: Identifiable, Sendable {
id: Int,
connect: @escaping @Sendable () async -> Void,
disconnect: @escaping @Sendable () async -> Void,
isConnected: @escaping @Sendable () async -> AnyPublisher<Bool, Never>,
channel: @escaping @Sendable (Topic, JSON) async -> Channel,
remove: @escaping @Sendable (Topic) async -> Void,
removeAll: @escaping @Sendable () async -> Void,
Expand All @@ -28,6 +31,7 @@ public struct Socket: Identifiable, Sendable {
self.id = id
self.connect = connect
self.disconnect = disconnect
self.isConnected = isConnected
self.channel = channel
self.remove = remove
self.removeAll = removeAll
Expand Down Expand Up @@ -62,6 +66,7 @@ public extension Socket {
id: nextSocketID(),
connect: { await phoenix.connect() },
disconnect: { await phoenix.disconnect() },
isConnected: { phoenix.isConnected },
channel: { topic, joinPayload in
Channel.with(
await phoenix.channel(
Expand Down
2 changes: 1 addition & 1 deletion Tests/PhoenixTests/PhoenixChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ private extension PhoenixChannelTests {
}

var outgoingMessages: AsyncStream<WebSocketMessage> {
sendSubject.allValues
sendSubject.allAsyncValues
}

func nextOutoingMessage() async throws -> Message {
Expand Down
33 changes: 31 additions & 2 deletions Tests/PhoenixTests/PhoenixSocketTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,35 @@ final class PhoenixSocketTests: XCTestCase {
}
}

func testIsConnectedPublisher() async throws {
let didConnect = future(timeout: 2)
let didDisconnect = future(timeout: 2)

let socket = PhoenixSocket(
url: url,
makeWebSocket: makeFakeWebSocket
)

let sub = socket.isConnected
.sink { isConnected in
if isConnected {
didConnect.resolve()
} else {
didDisconnect.resolve()
}
}

await socket.connect()

try await didConnect.value

await socket.disconnect()

try await didDisconnect.value

sub.cancel()
}

// MARK: "channel"

func testCreateChannelWithTopicAndPayload() async throws {
Expand Down Expand Up @@ -768,7 +797,7 @@ private extension PhoenixSocketTests {
var makeFakeWebSocket: MakeWebSocket {
{ [weak self] _, _, _, onOpen, onClose async throws in
guard let self else { throw CancellationError() }
return self.fake(onOpen: onOpen, onClose: onClose)
return fake(onOpen: onOpen, onClose: onClose)
}
}

Expand Down Expand Up @@ -820,7 +849,7 @@ private extension PhoenixSocketTests {
}

var outgoingMessages: AsyncStream<WebSocketMessage> {
sendSubject.allValues
sendSubject.allAsyncValues
}

func sendReply(
Expand Down

0 comments on commit 0e7be7b

Please sign in to comment.