diff --git a/modules/core/src/main/scala/fluflu/Client.scala b/modules/core/src/main/scala/fluflu/Client.scala index 8fac62d8..b1b76bb8 100644 --- a/modules/core/src/main/scala/fluflu/Client.scala +++ b/modules/core/src/main/scala/fluflu/Client.scala @@ -42,15 +42,15 @@ object Client { PA: Unpacker[Option[Ack]] ): Client = new Client with LazyLogging { - @volatile private[this] var closed = false + @volatile private var closed = false private def scheduler(name: String) = Executors.newScheduledThreadPool(1, Utils.namedThreadFactory(name)) - private[this] val running = new AtomicBoolean() - private[this] val queue = new ConcurrentLinkedQueue[(String, MessageBufferPacker => Unit)] - private[this] val consumer = new ForwardConsumer(maximumPulls, connection, queue, packerConfig) - private[this] val worker = scheduler("fluflu-scheduler") + private val running = new AtomicBoolean() + private val queue = new ConcurrentLinkedQueue[(String, MessageBufferPacker => Unit)] + private val consumer = new ForwardConsumer(maximumPulls, connection, queue, packerConfig) + private val worker = scheduler("fluflu-scheduler") private object Worker extends Runnable { def start(): Either[Exception, Unit] = diff --git a/modules/core/src/main/scala/fluflu/Connection.scala b/modules/core/src/main/scala/fluflu/Connection.scala index f02954e0..d2150fa0 100644 --- a/modules/core/src/main/scala/fluflu/Connection.scala +++ b/modules/core/src/main/scala/fluflu/Connection.scala @@ -62,9 +62,9 @@ object Connection { with LazyLogging { import StandardSocketOptions._ - @volatile private[this] var closed: Boolean = false + @volatile private var closed: Boolean = false - @volatile private[this] var channel: SocketChannel = + @volatile private var channel: SocketChannel = doConnect(channelOpen, 0, Sleeper(settings.connectionBackof, settings.connectionTimeout, clock)).get protected def channelOpen: SocketChannel = { @@ -159,7 +159,7 @@ object Connection { } } - private[this] val ackBuffer = ByteBuffer.allocate(256) + private val ackBuffer = ByteBuffer.allocate(256) def writeAndRead(message: ByteBuffer): Try[ByteBuffer] = for { diff --git a/modules/core/src/main/scala/fluflu/Consumer.scala b/modules/core/src/main/scala/fluflu/Consumer.scala index 98fd5a91..449b3eaf 100644 --- a/modules/core/src/main/scala/fluflu/Consumer.scala +++ b/modules/core/src/main/scala/fluflu/Consumer.scala @@ -41,13 +41,13 @@ final class ForwardConsumer private[fluflu] ( UA: Unpacker[Option[Ack]] ) extends Consumer with LazyLogging { - private[this] val errorQueue: util.Queue[(String, ByteBuffer)] = new ConcurrentLinkedQueue[(String, ByteBuffer)]() + private val errorQueue: util.Queue[(String, ByteBuffer)] = new ConcurrentLinkedQueue[(String, ByteBuffer)]() - private[this] val mPacker = new ThreadLocal[MessageBufferPacker] { + private val mPacker = new ThreadLocal[MessageBufferPacker] { override def initialValue(): MessageBufferPacker = packerConfig.newBufferPacker() } - private[this] val b64e = Base64.getEncoder + private val b64e = Base64.getEncoder type E = (String, MessageBufferPacker => Unit) @@ -80,7 +80,7 @@ final class ForwardConsumer private[fluflu] ( } def makeMessages(m: Map[String, ListBuffer[MessageBufferPacker => Unit]]): Iterator[(String, ByteBuffer)] = - m.iterator.map((makeMessage _).tupled).collect { case Some(v) => v } + m.iterator.map { case (a, b) => makeMessage(a, b) }.collect { case Some(v) => v } private def send(chunk: String, msg: ByteBuffer): Unit = connection.writeAndRead(msg) match { @@ -110,7 +110,7 @@ final class ForwardConsumer private[fluflu] ( def consume(): Unit = if (msgQueue.isEmpty && errorQueue.isEmpty) () else { - retrieveErrors().foreach((send _).tupled) - makeMessages(retrieveElements()).foreach((send _).tupled) + retrieveErrors().foreach { case (a, b) => send(a, b) } + makeMessages(retrieveElements()).foreach { case (a, b) => send(a, b) } } } diff --git a/modules/core/src/main/scala/fluflu/Sleeper.scala b/modules/core/src/main/scala/fluflu/Sleeper.scala index 372af335..a460a03b 100644 --- a/modules/core/src/main/scala/fluflu/Sleeper.scala +++ b/modules/core/src/main/scala/fluflu/Sleeper.scala @@ -14,7 +14,7 @@ trait Sleeper { object Sleeper { def apply(backoff: Backoff, timeout: FiniteDuration, clock: Clock): Sleeper = new Sleeper { - private[this] val start = Instant.now(clock) + private val start = Instant.now(clock) def giveUp: Boolean = Instant.now(clock).minusNanos(timeout.toNanos).compareTo(start) > 0 diff --git a/modules/msgpack-mess/src/main/scala/fluflu/msgpack/instances/MessPackerInstances.scala b/modules/msgpack-mess/src/main/scala/fluflu/msgpack/instances/MessPackerInstances.scala index 2d448dab..fd74402f 100644 --- a/modules/msgpack-mess/src/main/scala/fluflu/msgpack/instances/MessPackerInstances.scala +++ b/modules/msgpack-mess/src/main/scala/fluflu/msgpack/instances/MessPackerInstances.scala @@ -20,16 +20,16 @@ private[instances] trait MessPackerInstances { encodeA(a).pack(packer) } - private[this] val encodeMOption: Encoder[MOption] = derivedEncoder[MOption] + private val encodeMOption: Encoder[MOption] = derivedEncoder[MOption] implicit val packMOptionByMess: Packer[MOption] = new Packer[MOption] { def apply(a: MOption, packer: MessagePacker): Unit = encodeMOption(a).pack(packer) } - private[this] val unpackerConfig = new MessagePack.UnpackerConfig().withBufferSize(124) + private val unpackerConfig = new MessagePack.UnpackerConfig().withBufferSize(124) - implicit private[this] val decodeOptionAck: Decoder[Option[Ack]] = derivedDecoder[Ack].map(Option.apply) + implicit private val decodeOptionAck: Decoder[Option[Ack]] = derivedDecoder[Ack].map(Option.apply) implicit val unpackAckByMess: Unpacker[Option[Ack]] = new Unpacker[Option[Ack]] { def apply(bytes: ByteBuffer): Either[Throwable, Option[Ack]] =