Skip to content

Commit

Permalink
finagle/finagle-netty4: Add CPU tracking and active socket count for …
Browse files Browse the repository at this point in the history
…worker threads in EventLoopGroupTracker

Problem

We'd like to be able to see how much work different netty threads are doing,
in order to size the pool correctly and potentially improve work distribution.

Solution

As an initial step, add stats for the cpu time for each thread and the number of
active sockets it's assigned to EventLoopGroupTracker. These stats will only be
collected if the `TrackWorkerPool.enableTracking` param is set to `true` on the
server (false by default).

Differential Revision: https://phabricator.twitter.biz/D1176906
  • Loading branch information
jcrossley authored and jenkins committed Oct 17, 2024
1 parent e1de2b3 commit 6818439
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 53 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,25 @@ Runtime Behavior Changes

* finagle-mysql: (Testing behaviour change only) Updated mysql version expected by integration tests to 8.0.21.
Added README in integration tests noting that this must exist for integration tests to run. ``PHAB_ID=D1152235``
* finagle-netty4: `EventLoopGroupTracker` (previously named `EventLoopGroupExecutionDelayTracker`) now collects
stats cpu_time_ms and active_sockets per netty worker thread.


New Features
~~~~~~~~~~

* finagle-mysql: Added support for LONG_BLOB data type. ``PHAB_ID=D1152247``


Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

* finagle-netty4: `c.t.f.netty4.threading.EventLoopGroupExecutionDelayTracker` has been renamed to
`EventLoopGroupTracker`, `c.t.f.netty4.threading.TrackWorkerPoolExecutionDelay` has been renamed to
`TrackWorkerPoolExcutionDelay`, `c.t.f.netty4.param.TrackWorkerPoolExecutionDelay` has been renamed
to `TrackWorkerPool`. These changes reflect the tracker's new functionality of collecting metrics
and data other than the execution delay (see Runtime Behaviour Changes). ``PHAB_ID=D1176906``

24.5.0
------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.twitter.finagle.ListeningServer
import com.twitter.finagle.Stack
import com.twitter.finagle.netty4.channel.Netty4FramedServerChannelInitializer
import com.twitter.finagle.netty4.channel.Netty4RawServerChannelInitializer
import com.twitter.finagle.netty4.threading.EventLoopGroupExecutionDelayTracker
import com.twitter.finagle.netty4.threading.EventLoopGroupTracker
import com.twitter.finagle.param.Stats
import com.twitter.finagle.param.Timer
import com.twitter.finagle.server.Listener
Expand Down Expand Up @@ -221,13 +221,13 @@ private[finagle] class ListeningServerBuilder(

def boundAddress: SocketAddress = ch.localAddress()

private[this] val workerPoolExecutionDelayTrackingSettings =
params[param.TrackWorkerPoolExecutionDelay]
if (workerPoolExecutionDelayTrackingSettings.enableTracking) {
EventLoopGroupExecutionDelayTracker.track(
private[this] val workerPoolTrackingSettings =
params[param.TrackWorkerPool]
if (workerPoolTrackingSettings.enableTracking) {
EventLoopGroupTracker.track(
params[param.WorkerPool].eventLoopGroup,
workerPoolExecutionDelayTrackingSettings.trackingTaskPeriod,
workerPoolExecutionDelayTrackingSettings.threadDumpThreshold,
workerPoolTrackingSettings.trackingTaskPeriod,
workerPoolTrackingSettings.threadDumpThreshold,
params[Stats].statsReceiver,
s"finagle/netty-4/delayTracking/${boundAddress}",
Logger.get("com.twitter.finagle.netty4.Netty4Listener.threadDelay")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,31 @@ import com.twitter.finagle.Stack
import com.twitter.util.Duration

/**
* Control for tracking execution delay in the worker threads for a listener. This is intended
* to be enabled for perf tracking, and may impact performance as it adds tracking runnables to
* the event executors. Stats will be written to the stats receiver for the listener under
* workerpool/deviation_ms. When thread dumping is enabled, all logging is done at the warning
* level.
* Control for tracking execution delay, cpu time, and active sockets in the worker threads for a
* listener. This is intended to be enabled for perf tracking, and may impact performance as it adds
* tracking runnables to the event executors. Stats will be written to the stats receiver for the
* listener under workerpool/deviation_ms. When thread dumping is enabled, all logging is done at
* the warning level.
*
* @param enableTracking If true enable thread pause tracking.
* @param enableTracking If true enable thread tracking.
* @param trackingTaskPeriod The fixed time scheduling window for the execution delay runnable.
* @param threadDumpThreshold If > 0ms, enable stack dumping of threads when they have been delayed for
* more than the threshold. Thresholds of < 10ms will not work as
* expected as the underlying executors do not use high resolution timers.
*/
case class TrackWorkerPoolExecutionDelay(
case class TrackWorkerPool(
enableTracking: Boolean,
trackingTaskPeriod: Duration,
threadDumpThreshold: Duration) {
def mk(): (TrackWorkerPoolExecutionDelay, Stack.Param[TrackWorkerPoolExecutionDelay]) =
(this, TrackWorkerPoolExecutionDelay.trackWorkerPoolExecutionDelayParam)
def mk(): (TrackWorkerPool, Stack.Param[TrackWorkerPool]) =
(this, TrackWorkerPool.trackWorkerPoolParam)

}

object TrackWorkerPoolExecutionDelay {
implicit val trackWorkerPoolExecutionDelayParam: Stack.Param[TrackWorkerPoolExecutionDelay] =
Stack.Param[TrackWorkerPoolExecutionDelay](
TrackWorkerPoolExecutionDelay(
object TrackWorkerPool {
implicit val trackWorkerPoolParam: Stack.Param[TrackWorkerPool] =
Stack.Param[TrackWorkerPool](
TrackWorkerPool(
false,
Duration.fromMilliseconds(0),
Duration.fromMilliseconds(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.netty.channel.EventLoopGroup
import java.util.concurrent.ScheduledThreadPoolExecutor
import scala.reflect.internal.util.WeakHashSet

object EventLoopGroupExecutionDelayTracker {
object EventLoopGroupTracker {

private[threading] val trackedEventLoopGroups = new WeakHashSet[EventLoopGroup]()

Expand All @@ -19,8 +19,8 @@ object EventLoopGroupExecutionDelayTracker {
* instrumentation.
*
* @param nettyEventLoopGroup The netty EventLoopGroup for which thread delays should be captured
* @param injectionPeriod The fixed delay for the runnables added to the EventLoopGroup threads to
* capture thread execution delays.
* @param trackingTaskPeriod The fixed delay for the runnables added to the EventLoopGroup threads to
* capture thread tracking information.
* @param dumpThreshold If > 0ms log seen delay for threads and the stack trace for threads at
* the when the threads exceed the dumpThreshold delay.
* @param statsReceiver The stats receiver under which execution delay stats should be reported.
Expand All @@ -29,7 +29,7 @@ object EventLoopGroupExecutionDelayTracker {
*/
def track(
nettyEventLoopGroup: EventLoopGroup,
injectionPeriod: Duration,
trackingTaskPeriod: Duration,
dumpThreshold: Duration,
statsReceiver: StatsReceiver,
dumpThreadPoolName: String,
Expand All @@ -50,10 +50,11 @@ object EventLoopGroupExecutionDelayTracker {
val stat = statsReceiver.stat("workerpool", "deviation_ms")
while (workerIter.hasNext) {
val loop = workerIter.next()
new EventLoopGroupExecutionDelayTrackingRunnable(
new EventLoopGroupTrackingRunnable(
loop,
injectionPeriod,
trackingTaskPeriod,
stat,
statsReceiver,
dumpThreshold,
dumpThresholdExceededThreadPool,
logger
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package com.twitter.finagle.netty4.threading

import com.twitter.finagle.stats.Stat
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.logging.Logger
import com.twitter.util.{Duration, Time}
import com.twitter.util.Duration
import com.twitter.util.Time
import io.netty.channel.SingleThreadEventLoop
import io.netty.util.concurrent.EventExecutor
import java.util.concurrent.{Callable, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}
import java.lang.management.ManagementFactory
import java.util.concurrent.Callable
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit

private[threading] class EventLoopGroupExecutionDelayTrackingRunnable(
eventExecutor: EventExecutor,
injectionPeriod: Duration,
private[threading] class EventLoopGroupTrackingRunnable(
executor: EventExecutor,
taskTrackingPeriod: Duration,
delayStat: Stat,
statsReceiver: StatsReceiver,
threadDumpThreshold: Duration,
dumpWatchThreadPool: Option[ScheduledThreadPoolExecutor],
dumpLogger: Logger)
Expand All @@ -22,10 +30,10 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable(
// the one thread in the executor. This is currently how netty is implemented
// but this class will stop working if netty changes their implementation
private[this] val executorThread: Thread = {
if (eventExecutor.inEventLoop()) {
if (executor.inEventLoop()) {
Thread.currentThread()
} else {
eventExecutor
executor
.submit(new Callable[Thread] {
override def call(): Thread = {
Thread.currentThread()
Expand All @@ -34,15 +42,26 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable(
}
}

private[this] val threadId = executorThread.getId
private[this] val threadName: String = executorThread.getName

private[this] var scheduledExecutionTime: Time = Time.now
private[this] var watchTask: Option[ScheduledFuture[_]] = None

private[this] val threadMXBean = ManagementFactory.getThreadMXBean

private[this] val scopedStatsReceiver = statsReceiver.scope(threadName)
private[this] val activeSocketsStat = scopedStatsReceiver.stat("active_sockets")
private[this] val cpuTimeCounter = scopedStatsReceiver.counter("cpu_time_ms")

// Accessed only from within the same netty thread
private[this] var prevCPUTimeMs = 0L

setWatchTask()
eventExecutor.scheduleWithFixedDelay(
executor.scheduleWithFixedDelay(
this,
0,
injectionPeriod.inMillis,
taskTrackingPeriod.inMillis,
java.util.concurrent.TimeUnit.MILLISECONDS
)

Expand All @@ -57,12 +76,26 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable(
dumpLogger.warning(
s"THREAD: $threadName EXECUTION DELAY is greater than ${threadDumpThreshold.inMillis}ms, was ${executionDelay.inMillis}ms"
)

}

delayStat.add(executionDelay.inMillis)
scheduledExecutionTime = Time.now.plus(injectionPeriod)
scheduledExecutionTime = Time.now.plus(taskTrackingPeriod)
setWatchTask()

var numActiveSockets = 0
// This will be nio event loop or epoll event loop.
executor.asInstanceOf[SingleThreadEventLoop].registeredChannelsIterator().forEachRemaining {
channel =>
if (channel.isActive) {
numActiveSockets += 1
}
}
activeSocketsStat.add(numActiveSockets)

// `getThreadCPUTime` returns the time in nanoseconds.
val currentCPUTimeMs = threadMXBean.getThreadCpuTime(threadId) / 1000000
cpuTimeCounter.incr(currentCPUTimeMs - prevCPUTimeMs)
prevCPUTimeMs = currentCPUTimeMs
}

private[this] def setWatchTask(): Unit = {
Expand All @@ -71,7 +104,7 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable(
dumpWatchThreadPool.get.schedule(
new Runnable {
override def run(): Unit = {
var builder = new StringBuilder()
val builder = new StringBuilder()
builder
.append(
s"THREAD: $threadName EXECUTION DELAY exceeded configured dump threshold. Thread stack trace:\n"
Expand All @@ -80,7 +113,7 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable(
dumpLogger.warning(builder.toString())
}
},
(injectionPeriod + threadDumpThreshold).inMillis,
(taskTrackingPeriod + threadDumpThreshold).inMillis,
TimeUnit.MILLISECONDS
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import org.scalatestplus.mockito.MockitoSugar
import scala.collection.JavaConverters._
import org.scalatest.funsuite.AnyFunSuite

class EventLoopGroupExecutionDelayTrackerTest
class EventLoopGroupTrackerTest
extends AnyFunSuite
with Eventually
with IntegrationPatience
with MockitoSugar {

test(
"EventLoopGroupExecutionDelayTracker with thread dump disabled records stats but no threads created and no logging"
"EventLoopGroupTracker with thread dump disabled records stats but no threads created and no logging"
) {
val statsReceiver = new InMemoryStatsReceiver

Expand All @@ -37,7 +37,7 @@ class EventLoopGroupExecutionDelayTrackerTest

val eventLoopGroup = new NioEventLoopGroup(1, executor)

EventLoopGroupExecutionDelayTracker.track(
EventLoopGroupTracker.track(
eventLoopGroup,
Duration.fromMilliseconds(50),
Duration.Zero,
Expand All @@ -55,8 +55,14 @@ class EventLoopGroupExecutionDelayTrackerTest
// Force ourselves to wait
Thread.sleep(300)

// we should have deviation stats
// we should have deviation, cpu time, and active sockets stats
assert(statsReceiver.stats.get(Seq("workerpool", "deviation_ms")).isDefined)
assert(
statsReceiver.counters
.get(Seq("finagle_thread_delay_tracking_test-1", "cpu_time_ms")).isDefined)
assert(
statsReceiver.stats
.get(Seq("finagle_thread_delay_tracking_test-1", "active_sockets")).isDefined)

// we should have no threads with the name no_threads_expected
Thread.getAllStackTraces.keySet().asScala.foreach { thread: Thread =>
Expand All @@ -68,7 +74,7 @@ class EventLoopGroupExecutionDelayTrackerTest
}

test(
"EventLoopGroupExecutionDelayTracker with thread dump enabled records stats creates watch threads and logs dumps"
"EventLoopGroupTracker with thread dump enabled records stats creates watch threads and logs dumps"
) {
val statsReceiver = new InMemoryStatsReceiver

Expand All @@ -80,7 +86,7 @@ class EventLoopGroupExecutionDelayTrackerTest

val eventLoopGroup = new NioEventLoopGroup(1, executor)

EventLoopGroupExecutionDelayTracker.track(
EventLoopGroupTracker.track(
eventLoopGroup,
Duration.fromMilliseconds(50),
Duration.fromMilliseconds(10),
Expand Down Expand Up @@ -115,10 +121,10 @@ class EventLoopGroupExecutionDelayTrackerTest
}

test(
"validate EventLoopGroupExecutionDelayTracker track guards against multiple submissions of the same EventLoopGroup"
"validate EventLoopGroupTracker track guards against multiple submissions of the same EventLoopGroup"
) {
// clear our tracking set first as other tests added to the set
EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.clear()
EventLoopGroupTracker.trackedEventLoopGroups.clear()

val statsReceiver = new InMemoryStatsReceiver

Expand All @@ -131,35 +137,35 @@ class EventLoopGroupExecutionDelayTrackerTest
val eventLoopGroup = new NioEventLoopGroup(1, executor)
val eventLoopGroup2 = new NioEventLoopGroup(1, executor)

EventLoopGroupExecutionDelayTracker.track(
EventLoopGroupTracker.track(
eventLoopGroup,
Duration.fromMilliseconds(50),
Duration.Zero,
statsReceiver,
"execution_delay_test_pool",
mockLogger
)
assert(EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.size == 1)
assert(EventLoopGroupTracker.trackedEventLoopGroups.size == 1)

EventLoopGroupExecutionDelayTracker.track(
EventLoopGroupTracker.track(
eventLoopGroup2,
Duration.fromMilliseconds(50),
Duration.Zero,
statsReceiver,
"execution_delay_test_pool",
mockLogger
)
assert(EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.size == 2)
assert(EventLoopGroupTracker.trackedEventLoopGroups.size == 2)

EventLoopGroupExecutionDelayTracker.track(
EventLoopGroupTracker.track(
eventLoopGroup,
Duration.fromMilliseconds(50),
Duration.Zero,
statsReceiver,
"execution_delay_test_pool",
mockLogger
)
assert(EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.size == 2)
assert(EventLoopGroupTracker.trackedEventLoopGroups.size == 2)

}
}

0 comments on commit 6818439

Please sign in to comment.