Skip to content

Commit

Permalink
Merge pull request #134 from tkrs/test/connection
Browse files Browse the repository at this point in the history
Add Connection's spec
  • Loading branch information
tkrs authored Nov 19, 2017
2 parents 552d103 + 63cc1f2 commit 5276a6b
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 30 deletions.
17 changes: 11 additions & 6 deletions modules/core/src/main/scala/fluflu/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object Connection {
implicit clock: Clock = Clock.systemUTC()): Connection =
new ConnectionImpl(remote, timeout, backoff)

final class ConnectionImpl(remote: InetSocketAddress, timeout: Duration, backoff: Backoff)(
class ConnectionImpl(remote: InetSocketAddress, timeout: Duration, backoff: Backoff)(
implicit clock: Clock)
extends Connection
with LazyLogging {
Expand All @@ -35,7 +35,7 @@ object Connection {
@volatile private[this] var channel: SocketChannel =
doConnect(channelOpen, 0, Sleeper(backoff, timeout, clock)).get

private def channelOpen = {
protected def channelOpen: SocketChannel = {
val ch = SocketChannel.open()
ch.setOption[JBool](TCP_NODELAY, true)
ch.setOption[JBool](SO_KEEPALIVE, true)
Expand All @@ -46,9 +46,10 @@ object Connection {
retries: Int,
sleeper: Sleeper): Try[SocketChannel] = {
logger.debug(s"Start connecting to $remote. retries: $retries")
try if (ch.connect(remote)) Success(ch)
else Failure(new IOException(s"Failed to connect: $remote"))
catch {
try {
if (ch.connect(remote)) Success(ch)
else Failure(new IOException(s"Failed to connect: $remote"))
} catch {
case e: IOException =>
if (sleeper.giveUp) {
closed = true
Expand All @@ -67,7 +68,11 @@ object Connection {
def connect(): Try[SocketChannel] =
if (closed) Failure(new Exception("Already closed"))
else if (channel.isConnected) Success(channel)
else doConnect(channelOpen, 0, Sleeper(backoff, timeout, clock))
else
doConnect(channelOpen, 0, Sleeper(backoff, timeout, clock)) match {
case t @ Success(c) => channel = c; t
case f => f
}

def isClosed: Boolean =
closed || channel.isConnected
Expand Down
2 changes: 1 addition & 1 deletion modules/examples/src/main/scala/examples/examples.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ object Counter extends Base {
val observable =
Observable.repeatEval(Event("example", "counter", Num(counter.getAndIncrement())))

val consumer = Consumer.foreachParallelAsync[Event[Num]](4)(event =>
val consumer = Consumer.foreachParallelTask[Event[Num]](4)(event =>
Task(client.emit(event) match {
case Left(e) => logger.error(s"Exception occurred: ${e.getMessage}")
case _ => ()
Expand Down
84 changes: 84 additions & 0 deletions modules/tests/src/test/scala/fluflu/ConnectionSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package fluflu

import java.io.IOException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.SocketChannel
import java.time.{Clock, Duration}
import java.util.concurrent.LinkedBlockingDeque

import org.scalatest.{FunSpec, Matchers}
import org.scalatest.mockito.MockitoSugar
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer

class ConnectionSpec extends FunSpec with MockitoSugar with Matchers {
import Connection.ConnectionImpl

implicit val clock: Clock = Clock.systemUTC()

val address: InetSocketAddress = mock[InetSocketAddress]
val duration: Duration = Duration.ofSeconds(1)
val backoff: Backoff = Backoff.fix(duration)

describe("constructor") {
it("should create instance successfully with retry some time") {
val isConnectedAnswers = new LinkedBlockingDeque[Boolean]()
isConnectedAnswers.add(false)
isConnectedAnswers.add(false)
isConnectedAnswers.add(true)
val arg = ByteBuffer.wrap(Array(1, 2, 3).map(_.toByte))
val channelMock = mock[SocketChannel]
when(channelMock.isConnected).thenAnswer(new Answer[Boolean] {
override def answer(invocation: InvocationOnMock): Boolean =
isConnectedAnswers.take()
})
when(channelMock.connect(address)).thenReturn(true)
when(channelMock.write(arg)).thenAnswer(new Answer[Int] {
override def answer(invocation: InvocationOnMock): Int = {
val arr = Array.ofDim[Byte](1)
arg.get(arr)
1
}
})
final class TestConnection extends ConnectionImpl(address, duration, backoff) {
override protected def channelOpen: SocketChannel = channelMock
}
new TestConnection
}
}
describe("write") {
it("should write successfully") {
val arg = ByteBuffer.wrap(Array(1, 2, 3).map(_.toByte))
val channelMock = mock[SocketChannel]
when(channelMock.isConnected).thenReturn(true)
when(channelMock.connect(address)).thenReturn(true)
when(channelMock.write(arg)).thenAnswer(new Answer[Int] {
override def answer(invocation: InvocationOnMock): Int = {
val arr = Array.ofDim[Byte](1)
arg.get(arr)
1
}
})
final class TestConnection extends ConnectionImpl(address, duration, backoff) {
override protected def channelOpen: SocketChannel = channelMock
}
val conn = new TestConnection
assert(conn.write(arg).isSuccess)
}
it("should write failed when it occurs IOException") {
val arg = ByteBuffer.wrap(Array(1, 2, 3).map(_.toByte))
val channelMock = mock[SocketChannel]
when(channelMock.isConnected).thenReturn(true)
when(channelMock.connect(address)).thenReturn(true)
when(channelMock.write(arg)).thenThrow(new IOException)

final class TestConnection extends ConnectionImpl(address, duration, backoff) {
override protected def channelOpen: SocketChannel = channelMock
}
val conn = new TestConnection
assert(conn.write(arg).isFailure)
}
}
}
48 changes: 25 additions & 23 deletions project/Deps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,34 @@ import sbt._

object Deps {
val Ver = new {
val `scala2.12` = "2.12.4"
val `scala2.11` = "2.11.11"
val scalafmt = "1.2.0"
val cats = "1.0.0-RC1"
val circe = "0.9.0-M2"
val scalacheck = "1.13.5"
val scalatest = "3.0.4"
val monix = "3.0.0-M2"
val scalaLogging = "3.7.2"
val logback = "1.2.3"
val msgpackJava = "0.8.13"
val `scala2.12` = "2.12.4"
val `scala2.11` = "2.11.11"
val scalafmt = "1.2.0"
val cats = "1.0.0-RC1"
val circe = "0.9.0-M2"
val scalacheck = "1.13.5"
val scalatest = "3.0.4"
val monix = "3.0.0-M2"
val scalaLogging = "3.7.2"
val logback = "1.2.3"
val msgpackJava = "0.8.13"
val mockito = "2.12.0"
}

val Pkg = new {
lazy val monixEval = "io.monix" %% "monix-eval" % Ver.monix
lazy val monixReactive = "io.monix" %% "monix-reactive" % Ver.monix
lazy val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % Ver.scalaLogging
lazy val circeCore = "io.circe" %% "circe-core" % Ver.circe
lazy val circeGeneric = "io.circe" %% "circe-generic" % Ver.circe
lazy val circeParser = "io.circe" %% "circe-parser" % Ver.circe
lazy val catsCore = "org.typelevel" %% "cats-core" % Ver.cats
lazy val scalatest = "org.scalatest" %% "scalatest" % Ver.scalatest
lazy val scalacheck = "org.scalacheck" %% "scalacheck" % Ver.scalacheck
lazy val msgpackJava = "org.msgpack" % "msgpack-core" % Ver.msgpackJava
lazy val logbackClassic = "ch.qos.logback" % "logback-classic" % Ver.logback
lazy val monixEval = "io.monix" %% "monix-eval" % Ver.monix
lazy val monixReactive = "io.monix" %% "monix-reactive" % Ver.monix
lazy val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % Ver.scalaLogging
lazy val circeCore = "io.circe" %% "circe-core" % Ver.circe
lazy val circeGeneric = "io.circe" %% "circe-generic" % Ver.circe
lazy val circeParser = "io.circe" %% "circe-parser" % Ver.circe
lazy val catsCore = "org.typelevel" %% "cats-core" % Ver.cats
lazy val scalatest = "org.scalatest" %% "scalatest" % Ver.scalatest
lazy val scalacheck = "org.scalacheck" %% "scalacheck" % Ver.scalacheck
lazy val msgpackJava = "org.msgpack" % "msgpack-core" % Ver.msgpackJava
lazy val logbackClassic = "ch.qos.logback" % "logback-classic" % Ver.logback
lazy val mockito = "org.mockito" % "mockito-core" % Ver.mockito

lazy val forTest = Seq(catsCore, scalatest, scalacheck).map(_ % "test")
lazy val forTest = Seq(catsCore, scalatest, scalacheck, mockito).map(_ % "test")
}
}

0 comments on commit 5276a6b

Please sign in to comment.