Skip to content

Commit

Permalink
chore: remove warngings
Browse files Browse the repository at this point in the history
  • Loading branch information
tkrs committed Aug 11, 2024
1 parent acd8380 commit 56b8161
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 18 deletions.
10 changes: 5 additions & 5 deletions modules/core/src/main/scala/fluflu/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ object Client {
PA: Unpacker[Option[Ack]]
): Client =
new Client with LazyLogging {
@volatile private[this] var closed = false
@volatile private var closed = false

private def scheduler(name: String) =
Executors.newScheduledThreadPool(1, Utils.namedThreadFactory(name))

private[this] val running = new AtomicBoolean()
private[this] val queue = new ConcurrentLinkedQueue[(String, MessageBufferPacker => Unit)]
private[this] val consumer = new ForwardConsumer(maximumPulls, connection, queue, packerConfig)
private[this] val worker = scheduler("fluflu-scheduler")
private val running = new AtomicBoolean()
private val queue = new ConcurrentLinkedQueue[(String, MessageBufferPacker => Unit)]
private val consumer = new ForwardConsumer(maximumPulls, connection, queue, packerConfig)
private val worker = scheduler("fluflu-scheduler")

private object Worker extends Runnable {
def start(): Either[Exception, Unit] =
Expand Down
6 changes: 3 additions & 3 deletions modules/core/src/main/scala/fluflu/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ object Connection {
with LazyLogging {
import StandardSocketOptions._

@volatile private[this] var closed: Boolean = false
@volatile private var closed: Boolean = false

@volatile private[this] var channel: SocketChannel =
@volatile private var channel: SocketChannel =
doConnect(channelOpen, 0, Sleeper(settings.connectionBackof, settings.connectionTimeout, clock)).get

protected def channelOpen: SocketChannel = {
Expand Down Expand Up @@ -159,7 +159,7 @@ object Connection {
}
}

private[this] val ackBuffer = ByteBuffer.allocate(256)
private val ackBuffer = ByteBuffer.allocate(256)

def writeAndRead(message: ByteBuffer): Try[ByteBuffer] =
for {
Expand Down
12 changes: 6 additions & 6 deletions modules/core/src/main/scala/fluflu/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ final class ForwardConsumer private[fluflu] (
UA: Unpacker[Option[Ack]]
) extends Consumer
with LazyLogging {
private[this] val errorQueue: util.Queue[(String, ByteBuffer)] = new ConcurrentLinkedQueue[(String, ByteBuffer)]()
private val errorQueue: util.Queue[(String, ByteBuffer)] = new ConcurrentLinkedQueue[(String, ByteBuffer)]()

private[this] val mPacker = new ThreadLocal[MessageBufferPacker] {
private val mPacker = new ThreadLocal[MessageBufferPacker] {
override def initialValue(): MessageBufferPacker = packerConfig.newBufferPacker()
}

private[this] val b64e = Base64.getEncoder
private val b64e = Base64.getEncoder

type E = (String, MessageBufferPacker => Unit)

Expand Down Expand Up @@ -80,7 +80,7 @@ final class ForwardConsumer private[fluflu] (
}

def makeMessages(m: Map[String, ListBuffer[MessageBufferPacker => Unit]]): Iterator[(String, ByteBuffer)] =
m.iterator.map((makeMessage _).tupled).collect { case Some(v) => v }
m.iterator.map { case (a, b) => makeMessage(a, b) }.collect { case Some(v) => v }

private def send(chunk: String, msg: ByteBuffer): Unit =
connection.writeAndRead(msg) match {
Expand Down Expand Up @@ -110,7 +110,7 @@ final class ForwardConsumer private[fluflu] (
def consume(): Unit =
if (msgQueue.isEmpty && errorQueue.isEmpty) ()
else {
retrieveErrors().foreach((send _).tupled)
makeMessages(retrieveElements()).foreach((send _).tupled)
retrieveErrors().foreach { case (a, b) => send(a, b) }
makeMessages(retrieveElements()).foreach { case (a, b) => send(a, b) }
}
}
2 changes: 1 addition & 1 deletion modules/core/src/main/scala/fluflu/Sleeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ trait Sleeper {
object Sleeper {
def apply(backoff: Backoff, timeout: FiniteDuration, clock: Clock): Sleeper =
new Sleeper {
private[this] val start = Instant.now(clock)
private val start = Instant.now(clock)

def giveUp: Boolean =
Instant.now(clock).minusNanos(timeout.toNanos).compareTo(start) > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ private[instances] trait MessPackerInstances {
encodeA(a).pack(packer)
}

private[this] val encodeMOption: Encoder[MOption] = derivedEncoder[MOption]
private val encodeMOption: Encoder[MOption] = derivedEncoder[MOption]

implicit val packMOptionByMess: Packer[MOption] = new Packer[MOption] {
def apply(a: MOption, packer: MessagePacker): Unit =
encodeMOption(a).pack(packer)
}

private[this] val unpackerConfig = new MessagePack.UnpackerConfig().withBufferSize(124)
private val unpackerConfig = new MessagePack.UnpackerConfig().withBufferSize(124)

implicit private[this] val decodeOptionAck: Decoder[Option[Ack]] = derivedDecoder[Ack].map(Option.apply)
implicit private val decodeOptionAck: Decoder[Option[Ack]] = derivedDecoder[Ack].map(Option.apply)

implicit val unpackAckByMess: Unpacker[Option[Ack]] = new Unpacker[Option[Ack]] {
def apply(bytes: ByteBuffer): Either[Throwable, Option[Ack]] =
Expand Down

0 comments on commit 56b8161

Please sign in to comment.