Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine socket #34

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,96 +2,27 @@ package com.ivanempire.lighthouse.socket

import android.net.wifi.WifiManager
import com.ivanempire.lighthouse.LighthouseLogger
import com.ivanempire.lighthouse.models.Constants.DEFAULT_MULTICAST_ADDRESS
import com.ivanempire.lighthouse.models.Constants.LIGHTHOUSE_CLIENT
import com.ivanempire.lighthouse.models.search.SearchRequest
import java.net.DatagramPacket
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.MulticastSocket
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.isActive

/** Specific implementation of [SocketListener] */
internal class AndroidSocketListener(
private val wifiManager: WifiManager,
private val retryCount: Int,
private val logger: LighthouseLogger? = null,
) : SocketListener {
retryCount: Int,
logger: LighthouseLogger? = null,
) : BaseSocketListener(retryCount, logger) {

private val multicastLock: WifiManager.MulticastLock by lazy {
wifiManager.createMulticastLock(LIGHTHOUSE_CLIENT)
}

private val multicastGroup: InetAddress by lazy {
InetAddress.getByName(DEFAULT_MULTICAST_ADDRESS)
}

private fun setupSocket(): MulticastSocket {
override fun acquireMulticastLock() {
multicastLock.setReferenceCounted(true)
multicastLock.acquire()

val multicastSocket = MulticastSocket(null)
multicastSocket.reuseAddress = true
multicastSocket.broadcast = true
multicastSocket.loopbackMode = true // disable LoopbackMode

try {
multicastSocket.joinGroup(multicastGroup)
multicastSocket.bind(InetSocketAddress(MULTICAST_PORT))
logger?.logStatusMessage(TAG, "MulticastSocket has been setup")
} catch (ex: Exception) {
logger?.logErrorMessage(
TAG,
"Could finish setting up the multicast socket and group",
ex
)
}

return multicastSocket
}

override fun listenForPackets(searchRequest: SearchRequest): Flow<DatagramPacket> {
logger?.logStatusMessage(TAG, "Setting up datagram packet flow")
val multicastSocket = setupSocket()

return flow {
multicastSocket.use {
val datagramPacketRequest = searchRequest.toDatagramPacket(multicastGroup)

repeat(retryCount + 1) { multicastSocket.send(datagramPacketRequest) }

while (currentCoroutineContext().isActive) {
val discoveryBuffer = ByteArray(MULTICAST_DATAGRAM_SIZE)
val discoveryDatagram =
DatagramPacket(discoveryBuffer, discoveryBuffer.size)
it.receive(discoveryDatagram)
emit(discoveryDatagram)
}
}
}
.onCompletion { teardownSocket(multicastSocket) }
}

private fun teardownSocket(multicastSocket: MulticastSocket) {
logger?.logStatusMessage(TAG, "Releasing resources")

override fun releaseMulticastLock() {
if (multicastLock.isHeld) {
multicastLock.release()
}

if (!multicastSocket.isClosed) {
multicastSocket.leaveGroup(multicastGroup)
multicastSocket.close()
}
}

private companion object {
const val TAG = "AndroidSocketListener"
const val MULTICAST_DATAGRAM_SIZE = 2048
const val MULTICAST_PORT = 1900
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.ivanempire.lighthouse.socket

import com.ivanempire.lighthouse.LighthouseLogger
import com.ivanempire.lighthouse.models.Constants.DEFAULT_MULTICAST_ADDRESS
import com.ivanempire.lighthouse.models.search.SearchRequest
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.isActive
import java.net.DatagramPacket
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.MulticastSocket

/** Specific implementation of [SocketListener] */
internal abstract class BaseSocketListener(
private val retryCount: Int,
private val logger: LighthouseLogger? = null,
) : SocketListener {

private val multicastGroup: InetAddress by lazy {
InetAddress.getByName(DEFAULT_MULTICAST_ADDRESS)
}

protected abstract fun acquireMulticastLock()

private fun setupSocket(): MulticastSocket {
acquireMulticastLock()

val multicastSocket = MulticastSocket(null)
multicastSocket.reuseAddress = true
multicastSocket.broadcast = true
multicastSocket.loopbackMode = true // disable LoopbackMode

try {
multicastSocket.joinGroup(multicastGroup)
multicastSocket.bind(InetSocketAddress(MULTICAST_PORT))
logger?.logStatusMessage(TAG, "MulticastSocket has been setup")
} catch (ex: Exception) {
logger?.logErrorMessage(
TAG,
"Could finish setting up the multicast socket and group",
ex
)
}

return multicastSocket
}

override fun listenForPackets(searchRequest: SearchRequest): Flow<DatagramPacket> {
logger?.logStatusMessage(TAG, "Setting up datagram packet flow")
val multicastSocket = setupSocket()

return flow {
val datagramPacketRequest = searchRequest.toDatagramPacket(multicastGroup)

repeat(retryCount + 1) {
if (!multicastSocket.isClosed) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fix a crash I experienced. Not sure why the socket wasn't open, but there are apparently times when it's not.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing you've seen this on desktop? Or did you observe this on Android?

multicastSocket.send(datagramPacketRequest)
}
}

while (currentCoroutineContext().isActive && !multicastSocket.isClosed) {
val discoveryBuffer = ByteArray(MULTICAST_DATAGRAM_SIZE)
val discoveryDatagram =
DatagramPacket(discoveryBuffer, discoveryBuffer.size)
multicastSocket.receive(discoveryDatagram)
emit(discoveryDatagram)
}
}
.catch { cause ->
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have broken this into multiple commits, sorry.

This is a bit tricky to notify with flows. If the connection gets closed, we should handle it gracefully. Also, it would be nice to notify the user. Perhaps the top level data passed to the user should include a connection state.

logger?.logErrorMessage(TAG, "", cause)
}
.onCompletion {
teardownSocket(multicastSocket)
}
.flowOn(Dispatchers.IO)
}

protected abstract fun releaseMulticastLock()

private fun teardownSocket(multicastSocket: MulticastSocket) {
logger?.logStatusMessage(TAG, "Releasing resources")

releaseMulticastLock()

if (!multicastSocket.isClosed) {
multicastSocket.leaveGroup(multicastGroup)
multicastSocket.close()
}
}

private companion object {
const val TAG = "BaseSocketListener"
const val MULTICAST_DATAGRAM_SIZE = 2048
const val MULTICAST_PORT = 1900
}
}
Original file line number Diff line number Diff line change
@@ -1,83 +1,18 @@
package com.ivanempire.lighthouse.socket

import com.ivanempire.lighthouse.LighthouseLogger
import com.ivanempire.lighthouse.models.Constants.DEFAULT_MULTICAST_ADDRESS
import com.ivanempire.lighthouse.models.search.SearchRequest
import java.net.DatagramPacket
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.MulticastSocket
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.isActive

/** Specific implementation of [SocketListener] */
internal class JvmSocketListener(
private val retryCount: Int,
private val logger: LighthouseLogger? = null,
) : SocketListener {
retryCount: Int,
logger: LighthouseLogger? = null,
) : BaseSocketListener(retryCount, logger) {

private val multicastGroup: InetAddress by lazy {
InetAddress.getByName(DEFAULT_MULTICAST_ADDRESS)
override fun acquireMulticastLock() {
// Nothing to do on JVM
}

private fun setupSocket(): MulticastSocket {
val multicastSocket = MulticastSocket(null)
multicastSocket.reuseAddress = true
multicastSocket.broadcast = true
multicastSocket.loopbackMode = true // disable LoopbackMode

try {
multicastSocket.joinGroup(multicastGroup)
multicastSocket.bind(InetSocketAddress(MULTICAST_PORT))
logger?.logStatusMessage(TAG, "MulticastSocket has been setup")
} catch (ex: Exception) {
logger?.logErrorMessage(
TAG,
"Could finish setting up the multicast socket and group",
ex
)
}

return multicastSocket
}

override fun listenForPackets(searchRequest: SearchRequest): Flow<DatagramPacket> {
logger?.logStatusMessage(TAG, "Setting up datagram packet flow")
val multicastSocket = setupSocket()

return flow {
multicastSocket.use {
val datagramPacketRequest = searchRequest.toDatagramPacket(multicastGroup)

repeat(retryCount + 1) { multicastSocket.send(datagramPacketRequest) }

while (currentCoroutineContext().isActive) {
val discoveryBuffer = ByteArray(MULTICAST_DATAGRAM_SIZE)
val discoveryDatagram =
DatagramPacket(discoveryBuffer, discoveryBuffer.size)
it.receive(discoveryDatagram)
emit(discoveryDatagram)
}
}
}
.onCompletion { teardownSocket(multicastSocket) }
}

private fun teardownSocket(multicastSocket: MulticastSocket) {
logger?.logStatusMessage(TAG, "Releasing resources")

if (!multicastSocket.isClosed) {
multicastSocket.leaveGroup(multicastGroup)
multicastSocket.close()
}
}

private companion object {
const val TAG = "RealSocketListener"
const val MULTICAST_DATAGRAM_SIZE = 2048
const val MULTICAST_PORT = 1900
override fun releaseMulticastLock() {
// Nothing to do on JVM
}
}
Loading