Skip to content

Commit

Permalink
Add span creation and error handling in AsyncQueueJobMessageAspect an…
Browse files Browse the repository at this point in the history
…… (#574)

* Add span creation and error handling in AsyncQueueJobMessageAspect and KafkaProducerAspect

* Fix span nullability issue in AsyncQueueJobMessageAspect

---------

Co-authored-by: Deeka Wong <[email protected]>
  • Loading branch information
huangdijia and huangdijia authored Feb 26, 2024
1 parent cb2221c commit 8bf5d16
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
17 changes: 11 additions & 6 deletions src/Tracing/Aspect/AsyncQueueJobMessageAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,16 @@ public function process(ProceedingJoinPoint $proceedingJoinPoint)

public function handlePush(ProceedingJoinPoint $proceedingJoinPoint)
{
$span = $this->startSpan(
'async_queue.job.publish',
$proceedingJoinPoint->arguments['keys']['job']::class
);

if (! $span) {
return $proceedingJoinPoint->process();
}

try {
$span = $this->startSpan(
'async_queue.job.publish',
$proceedingJoinPoint->arguments['keys']['job']::class
);
$data = [];

/** @var \Hyperf\AsyncQueue\Driver\Driver $driver */
Expand All @@ -77,7 +82,7 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint)
};

if (count($data)) {
$span?->setData($data);
$span->setData($data);
}

$carrier = $this->packer->pack($span);
Expand All @@ -86,7 +91,7 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint)
return $proceedingJoinPoint->process();
} catch (Throwable) {
} finally {
$span?->finish();
$span->finish();
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/Tracing/Aspect/KafkaProducerAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ protected function sendAsync(ProceedingJoinPoint $proceedingJoinPoint)
'kafka.send',
sprintf('%s::%s', $proceedingJoinPoint->className, $proceedingJoinPoint->methodName)
);

if (! $span) {
return $proceedingJoinPoint->process();
}

$carrier = $this->packer->pack($span);
$headers = $proceedingJoinPoint->arguments['keys']['headers'] ?? [];
$headers[] = (new RecordHeader())
Expand All @@ -70,6 +75,11 @@ protected function sendBatchAsync(ProceedingJoinPoint $proceedingJoinPoint)
'kafka.send_batch',
sprintf('%s::%s', $proceedingJoinPoint->className, $proceedingJoinPoint->methodName)
);

if (! $span) {
return $proceedingJoinPoint->process();
}

$carrier = $this->packer->pack($span);

foreach ($messages as $message) {
Expand Down

0 comments on commit 8bf5d16

Please sign in to comment.