Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Feb 8, 2024
1 parent 9aac38f commit dfe8865
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 82 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ lazy val core = project
"org.typelevel" %% "cats-effect" % Versions.catsEffect,
"co.fs2" %% "fs2-core" % Versions.fs2,
"co.fs2" %% "fs2-io" % Versions.fs2,
"io.circe" %% "circe-core" % Versions.circe,
"io.circe" %% "circe-generic" % Versions.circe,
"io.circe" %% "circe-parser" % Versions.circe,
"org.scalameta" %% "munit" % Versions.munit % Test
)
)
Expand Down
45 changes: 11 additions & 34 deletions core/src/main/scala/Serialize.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,19 @@ package choreo
trait Serialize[A]:
def encode(a: A): Array[Byte]
def decode(encoded: Array[Byte]): Option[A]
end Serialize

object Serialize:
given Serialize[Boolean] with
def encode(a: Boolean): Array[Byte] =
a.toString.getBytes
import io.circe.{Encoder, Decoder}

def decode(encoded: Array[Byte]): Option[Boolean] =
String(encoded).toBooleanOption
given [A](using encoder: Encoder[A], decoder: Decoder[A]): Serialize[A] with
def encode(a: A): Array[Byte] =
encoder(a).noSpaces.getBytes

given Serialize[Int] with
def encode(a: Int): Array[Byte] =
a.toString.getBytes
def decode(encoded: Array[Byte]): Option[A] =
io.circe.parser
.parse(new String(encoded))
.toOption
.flatMap(decoder.decodeJson(_).toOption)

def decode(encoded: Array[Byte]): Option[Int] =
String(encoded).toIntOption

given Serialize[Double] with
def encode(a: Double): Array[Byte] =
a.toString.getBytes

def decode(encoded: Array[Byte]): Option[Double] =
String(encoded).toDoubleOption

given Serialize[String] with
def encode(a: String): Array[Byte] =
a.getBytes

def decode(encoded: Array[Byte]): Option[String] =
Some(String(encoded))

given [A](using s: Serialize[A]): Serialize[Option[A]] with
def encode(a: Option[A]): Array[Byte] =
a match
case Some(a) => s.encode(a)
case None => Array.emptyByteArray

def decode(encoded: Array[Byte]): Option[Option[A]] =
if encoded.isEmpty then Some(None)
else s.decode(encoded).map(Some(_))
end Serialize
55 changes: 37 additions & 18 deletions core/src/main/scala/backend/TCP.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,38 @@ import java.nio.ByteBuffer
type Peer = SocketAddress[IpAddress]
case class Client[M[_]](loc: Loc, peer: Peer, queue: Queue[M, Frame])

case class Frame(payload: Chunk[Byte])
case class Frame(from: Loc, payload: Chunk[Byte])

object Frame:
def read[M[_]](s: Socket[M])(using M: Monad[M]): M[Frame] =
def readLoc[M[_]](s: Socket[M])(using M: Monad[M]): M[Loc] =
for loc <- readPayload(s)
yield new String(loc.toArray)

def readPayload[M[_]](s: Socket[M])(using M: Monad[M]): M[Chunk[Byte]] =
for
sizeBytes <- s.readN(4)
buffer = ByteBuffer.wrap(sizeBytes.toArray).order(LITTLE_ENDIAN)
size = buffer.getInt
payload <- s.readN(size)
yield Frame(payload)
yield payload

def write[M[_]: Monad](s: Socket[M], frame: Frame): M[Unit] =
val size = frame.payload.size
def read[M[_]](s: Socket[M])(using M: Monad[M]): M[Frame] =
for
from <- readLoc(s)
payload <- readPayload(s)
yield Frame(from, payload)

def writePayload[M[_]: Monad](s: Socket[M], payload: Chunk[Byte]): M[Unit] =
val size = payload.size
val buffer = ByteBuffer.allocate(4).order(LITTLE_ENDIAN)
val sizeBytes = buffer.putInt(size).array
s.write(Chunk.array(sizeBytes)) >> s.write(frame.payload)
s.write(Chunk.array(sizeBytes)) >> s.write(payload)

def writeLoc[M[_]: Monad](s: Socket[M], from: Loc): M[Unit] =
writePayload(s, Chunk.array(from.getBytes))

def write[M[_]: Monad](s: Socket[M], frame: Frame): M[Unit] =
writeLoc(s, frame.from) >> writePayload(s, frame.payload)

class TCPBackend[M[_]](peers: Map[Loc, Peer]):
val locs = peers.keys.toSeq
Expand All @@ -48,27 +64,24 @@ class TCPBackend[M[_]](peers: Map[Loc, Peer]):
C: Concurrent[M],
N: FS2Network[M]
): M[A] =
val me = peers(at)
for
me <- C.pure(peers.get(at).get)
clients <- makeClients
server = FS2Network[M].server(port = Some(me.port))

fiber <- server
.parEvalMapUnordered(clients.size) { socket =>
fiber <- FS2Network[M]
.server(port = Some(me.port))
.parEvalMapUnordered(clients.size + 1) { socket =>
for
frame <- Frame.read(socket)
localAddr <- socket.remoteAddress
remoteAddr <- socket.remoteAddress
_ = println(clients)
_ = println(s"Received $frame from $remoteAddr @ $localAddr")
client = clients.find(_.peer == remoteAddr).get
// _ = println(s"[$at] Received frame from $remoteAddr: $frame")
client = clients.find(_.loc == frame.from).get
_ <- client.queue.offer(frame)
// _ = println(s"[$at] Offered frame to ${client.loc}: $frame")
yield ()
}
.compile
.drain
.start

result <- network.foldMap(run(at, clients).toFunctionK)
yield result

Expand All @@ -83,20 +96,26 @@ class TCPBackend[M[_]](peers: Map[Loc, Peer]):

case NetworkSig.Send(a, to, ser) =>
val encoded = ser.encode(a)
val chunk: fs2.Chunk[Byte] = Chunk.array(encoded)
val chunk = Chunk.array[Byte](encoded)
val client = clients.find(_.loc == to).get
val socket = FS2Network[M].client(client.peer)
val frame = Frame(at, chunk)
// println(s"[$at] Sending frame to $to: $frame")
socket.use: socket =>
Frame.write(socket, Frame(chunk))
Frame.write(socket, frame)

case NetworkSig.Recv(from, ser) =>
val client = clients.find(_.loc == from).get
// println(s"[$at] Waiting for message from $from")
for
frame <- client.queue.take
// _ = println(s"[$at] Received frame from $from: $frame")
value = ser.decode(frame.payload.toArray).get
// _ = println(s"[$at] Decoded $value")
yield value

case NetworkSig.Broadcast(a, ser) =>
// println(s"[$at] Broadcasting $a")
locs
.filter(_ != at)
.traverse_ { to =>
Expand Down
21 changes: 10 additions & 11 deletions examples/src/main/scala/Bookseller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,21 @@ import com.comcast.ip4s.SocketAddress
import com.comcast.ip4s.IpAddress
import com.comcast.ip4s.Port

import io.circe.*
import io.circe.generic.semiauto.*

case class Book(title: String, price: Double)

object Book:
given Encoder[Book] = deriveEncoder
given Decoder[Book] = deriveDecoder

case class Date(year: Int, month: Int, day: Int):
override def toString(): String = s"$year-$month-$day"

given Serialize[Date] with
def encode(a: Date): Array[Byte] =
a.toString.getBytes

def decode(encoded: Array[Byte]): Option[Date] =
String(encoded).split("-") match
case Array(year, month, day) =>
Some(Date(year.toInt, month.toInt, day.toInt))

case _ =>
None
object Date:
given Encoder[Date] = deriveEncoder
given Decoder[Date] = deriveDecoder

val books = List(
Book("Functional Programming in Scala", 121.0),
Expand Down
25 changes: 6 additions & 19 deletions examples/src/main/scala/KV.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,18 @@ import com.comcast.ip4s.IpAddress
import com.comcast.ip4s.SocketAddress
import com.comcast.ip4s.Port

import io.circe.*
import io.circe.generic.semiauto.*

type State = Map[String, String]

enum Request:
case Get(key: String)
case Put(key: String, value: String)

given Serialize[Request] with
def encode(a: Request): Array[Byte] =
a match
case Request.Get(key) =>
s"GET $key".getBytes

case Request.Put(key, value) =>
s"PUT $key $value".getBytes

def decode(encoded: Array[Byte]): Option[Request] =
String(encoded).split(" ") match
case Array("GET", key) =>
Some(Request.Get(key))

case Array("PUT", key, value) =>
Some(Request.Put(key, value))

case _ =>
None
object Request:
given Encoder[Request] = deriveEncoder
given Decoder[Request] = deriveDecoder

type Response = Option[String]

Expand Down
1 change: 1 addition & 0 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ object Versions {
val cats = "2.9.0"
val catsEffect = "3.5.3"
val fs2 = "3.9.4"
val circe = "0.15.0-M1"
val munit = "0.7.29"
}

0 comments on commit dfe8865

Please sign in to comment.