From a4fa69f8223a0e22e055464e5bb9142766a4306d Mon Sep 17 00:00:00 2001 From: "Michal J. Sladek" Date: Tue, 10 Dec 2024 09:28:45 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20basis=20opentelemetry=20st=C3=B8tte=20i?= =?UTF-8?q?=20for=20prosesstask=20(#746)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: basis opentelemetry støtte i for prosesstask. * Review * Rydder versjonering --- pom.xml | 2 +- .../impl/ProsessTaskHandlerRef.java | 2 +- .../felles/prosesstask/impl/RunTask.java | 36 ++++++++------- .../felles/prosesstask/impl/TaskManager.java | 45 ++++++++++--------- .../impl/TaskManagerRunnableTask.java | 25 ++++++++++- .../prosesstask/impl/util/OtelUtil.java | 39 ++++++++++++++++ 6 files changed, 110 insertions(+), 39 deletions(-) create mode 100644 task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/util/OtelUtil.java diff --git a/pom.xml b/pom.xml index cb4c643..3e32f03 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ fp-prosesstask navikt_fp-prosesstask - 7.4.0 + 7.4.3 10.0.0 6.6.3.Final diff --git a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/ProsessTaskHandlerRef.java b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/ProsessTaskHandlerRef.java index 4f80b62..74787fe 100644 --- a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/ProsessTaskHandlerRef.java +++ b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/ProsessTaskHandlerRef.java @@ -65,7 +65,7 @@ protected ProsessTaskHandler getBean() { private ProsessTask getProsessTaskAnnotation() { Class clazz = getTargetClassExpectingAnnotation(ProsessTask.class); if (!clazz.isAnnotationPresent(ProsessTask.class)) { - throw new IllegalStateException(clazz.getSimpleName() + " mangler annotering @ProsesTask"); + throw new IllegalStateException(clazz.getSimpleName() + " mangler annotering @ProsesTask"); } return clazz.getAnnotation(ProsessTask.class); } diff --git a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTask.java b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTask.java index b89c659..11a3c03 100644 --- a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTask.java +++ b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTask.java @@ -20,6 +20,8 @@ import jakarta.persistence.PersistenceException; import jakarta.transaction.Transactional; +import no.nav.vedtak.felles.prosesstask.impl.util.OtelUtil; + import org.hibernate.Session; import org.hibernate.exception.JDBCConnectionException; import org.hibernate.jdbc.Work; @@ -91,10 +93,12 @@ protected void runTaskAndUpdateStatus(Connection conn, ProsessTaskEntitet pte, P return; } - pickAndRun.dispatchWork(pte); + OtelUtil.wrapper().span("TASK " + taskType.value(), OtelUtil.taskAttributter(pte.tilProsessTask()), () -> { + pickAndRun.dispatchWork(pte); + // flush for å fange andre constraint feil etc før vi markerer ferdig + getEntityManager().flush(); + }); - // flush for å fange andre constraint feil etc før vi markerer ferdig - getEntityManager().flush(); if (ProsessTaskStatus.KLAR == pte.getStatus()) { var sluttStatus = pickAndRun.markerTaskFerdig(pte); @@ -203,7 +207,7 @@ void handleTaskFeil(ProsessTaskEntitet pte, Exception e) { feilOgStatushåndterer.handleTaskFeil(retryPolicy, pte, e); } - void handleFatalTaskFeil(ProsessTaskEntitet pte,Feil feil, Exception e) { + void handleFatalTaskFeil(ProsessTaskEntitet pte, Feil feil, Exception e) { feilOgStatushåndterer.handleFatalTaskFeil(pte, feil, e); } @@ -219,7 +223,7 @@ ProsessTaskStatus markerTaskFerdig(ProsessTaskEntitet pte) { // frigir veto etter at event handlere er fyrt vetoHåndterer.frigiVeto(pte); - ProsessTaskStatus nyStatus = ProsessTaskStatus.KJOERT; + var nyStatus = ProsessTaskStatus.KJOERT; taskManagerRepository.oppdaterStatus(pte.getId(), nyStatus); pte = refreshProsessTask(pte.getId()); @@ -230,7 +234,7 @@ ProsessTaskStatus markerTaskFerdig(ProsessTaskEntitet pte) { // markerer task som påbegynt (merk committer ikke før til slutt). void markerTaskUnderArbeid(ProsessTaskEntitet pte) { // mark row being processed with timestamp and server process id - LocalDateTime now = LocalDateTime.now(); + var now = LocalDateTime.now(); pte.setSisteKjøring(now); pte.setSisteKjøringServer(getJvmUniqueProcessName()); getEntityManager().persist(pte); @@ -240,15 +244,15 @@ void markerTaskUnderArbeid(ProsessTaskEntitet pte) { // regner ut neste kjøretid for tasks som kan repeteres (har CronExpression) void planleggNesteKjøring(ProsessTaskEntitet pte) throws SQLException { if (cronExpression != null) { - String gruppe = ProsessTaskRepository.getUniktProsessTaskGruppeNavn(taskManagerRepository.getEntityManager()); - LocalDateTime now = LocalDateTime.now(); - LocalDateTime nesteKjøring = cronExpression.nextLocalDateTimeAfter(now); + var gruppe = ProsessTaskRepository.getUniktProsessTaskGruppeNavn(taskManagerRepository.getEntityManager()); + var now = LocalDateTime.now(); + var nesteKjøring = cronExpression.nextLocalDateTimeAfter(now); var data = ProsessTaskDataBuilder.forTaskType(pte.getTaskType()) .medNesteKjøringEtter(nesteKjøring) .medProperties(pte.getProperties()) .medGruppe(gruppe) .medSekvens(pte.getSekvens()); - ProsessTaskEntitet nyPte = new ProsessTaskEntitet().kopierFraNy(data.build()); + var nyPte = new ProsessTaskEntitet().kopierFraNy(data.build()); getEntityManager().persist(nyPte); getEntityManager().flush(); @@ -266,7 +270,7 @@ void markerTaskUnderArbeid(ProsessTaskEntitet pte) { } void dispatchWork(ProsessTaskEntitet pte) throws Exception { - ProsessTaskData taskData = pte.tilProsessTask(); + var taskData = pte.tilProsessTask(); taskInfo.getTaskDispatcher().dispatch(taskData); } @@ -277,7 +281,7 @@ private EntityManager getEntityManager() { @SuppressWarnings("rawtypes") void runTask() { - final PickAndRunTask pickAndRun = this; + final var pickAndRun = this; /* * Bruker SQL+JDBC for å kunne benytte savepoints og inkrementell oppdatering i * transaksjonen. @@ -286,7 +290,7 @@ class PullSingleTask implements Work { @Override public void execute(Connection conn) throws SQLException { try { - Optional pte = taskManagerRepository.finnOgLås(taskInfo); + var pte = taskManagerRepository.finnOgLås(taskInfo); if (pte.isPresent()) { runTaskAndUpdateStatus(conn, pte.get(), pickAndRun); } @@ -304,15 +308,15 @@ public void execute(Connection conn) throws SQLException { } - PullSingleTask pullSingleTask = new PullSingleTask(); - EntityManager em = getEntityManager(); + var pullSingleTask = new PullSingleTask(); + var em = getEntityManager(); // workaround for hibernate issue HHH-11020 if (em instanceof TargetInstanceProxy tip) { em = (EntityManager) tip.weld_getTargetInstance(); } @SuppressWarnings("resource") // skal ikke lukke session her - Session session = em.unwrap(Session.class); + var session = em.unwrap(Session.class); session.doWork(pullSingleTask); diff --git a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManager.java b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManager.java index 2d17dcd..c9d7c62 100644 --- a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManager.java +++ b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManager.java @@ -23,23 +23,24 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.hibernate.exception.JDBCConnectionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.opentelemetry.api.trace.SpanKind; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Any; import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.spi.CDI; import jakarta.inject.Inject; import jakarta.persistence.EntityManager; - -import org.hibernate.exception.JDBCConnectionException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import no.nav.vedtak.exception.TekniskException; import no.nav.vedtak.felles.jpa.TransactionHandler; import no.nav.vedtak.felles.prosesstask.api.ProsessTaskData; import no.nav.vedtak.felles.prosesstask.api.ProsessTaskDispatcher; import no.nav.vedtak.felles.prosesstask.api.TaskMonitor; import no.nav.vedtak.felles.prosesstask.api.TaskType; +import no.nav.vedtak.felles.prosesstask.impl.util.OtelUtil; import no.nav.vedtak.log.metrics.Controllable; /** @@ -363,7 +364,8 @@ protected List doWork(EntityManager entityManager) { */ @Override public Integer call() { - return RequestContextHandler.doWithRequestContext(this::doPollingWithEntityManager); + return OtelUtil.wrapper().span("POLL_TASKS", spanBuilder -> spanBuilder.setSpanKind(SpanKind.INTERNAL).setNoParent(), + () -> RequestContextHandler.doWithRequestContext(this::doPollingWithEntityManager)); } public Integer doPollingWithEntityManager() { @@ -389,7 +391,7 @@ public Integer doPollingWithEntityManager() { } catch (Exception e) { backoffRound.set(backoffInterval.length - 1); // force max delay (skal kun havne her for Exception/RuntimeException) LOG.warn("PT-996896 Kunne ikke polle database, venter til neste runde(runde={})", backoffRound.get(), e); - } catch (Throwable t) { + } catch (Throwable t) { // NOSONAR backoffRound.set(backoffInterval.length - 1); // force max delay (skal kun havne her for Error) LOG.error("PT-996897 Kunne ikke polle grunnet kritisk feil, venter ({}s)", getBackoffIntervalSeconds(), t); } @@ -510,13 +512,14 @@ public Integer doWithContext() { @Override public void run() { - RequestContextHandler.doWithRequestContext(this::doWithContext); - // neste kjører mellom 1-10 min fra nå. - var min = 60L * 1000; - var delay = System.currentTimeMillis() % (9 * min); - pollingService.schedule(this, min + delay, TimeUnit.MILLISECONDS); + OtelUtil.wrapper().span("MoveToDonePartition", spanBuilder -> spanBuilder.setSpanKind(SpanKind.INTERNAL).setNoParent(), () -> { + RequestContextHandler.doWithRequestContext(this::doWithContext); + // neste kjører mellom 1-10 min fra nå. + var min = 60L * 1000; + var delay = System.currentTimeMillis() % (9 * min); + pollingService.schedule(this, min + delay, TimeUnit.MILLISECONDS); + }); } - } /** @@ -561,7 +564,8 @@ public Integer doWithContext() { @Override public void run() { - RequestContextHandler.doWithRequestContext(this::doWithContext); + OtelUtil.wrapper().span("FreeBlockedTasks", spanBuilder -> spanBuilder.setSpanKind(SpanKind.INTERNAL).setNoParent(), + () -> RequestContextHandler.doWithRequestContext(this::doWithContext)); } } @@ -609,13 +613,14 @@ public Integer doWithContext() { @Override public void run() { - RequestContextHandler.doWithRequestContext(this::doWithContext); - // neste kjører mellom 3-9 min fra nå. - var min = 3L * 60 * 1000; - var delay = System.currentTimeMillis() % (2 * min); - pollingService.schedule(this, min + delay, TimeUnit.MILLISECONDS); + OtelUtil.wrapper().span("UpdateTaskMonitor", spanBuilder -> spanBuilder.setSpanKind(SpanKind.INTERNAL), () -> { + RequestContextHandler.doWithRequestContext(this::doWithContext); + // neste kjører mellom 3-9 min fra nå. + var min = 3L * 60 * 1000; + var delay = System.currentTimeMillis() % (2 * min); + pollingService.schedule(this, min + delay, TimeUnit.MILLISECONDS); + }); } - } /** diff --git a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManagerRunnableTask.java b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManagerRunnableTask.java index e8f1df6..4a16c89 100644 --- a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManagerRunnableTask.java +++ b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManagerRunnableTask.java @@ -2,6 +2,14 @@ import java.time.LocalDateTime; import java.util.function.Consumer; +import java.util.function.UnaryOperator; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import no.nav.vedtak.felles.prosesstask.impl.util.OtelUtil; import org.slf4j.MDC; @@ -9,6 +17,8 @@ import no.nav.vedtak.felles.prosesstask.api.CallId; import no.nav.vedtak.felles.prosesstask.api.TaskType; +import static no.nav.vedtak.felles.prosesstask.impl.util.OtelUtil.taskAttributter; + class TaskManagerRunnableTask implements Runnable { private final TaskType taskType; private final RunTaskInfo taskInfo; @@ -24,6 +34,11 @@ class TaskManagerRunnableTask implements Runnable { @Override public void run() { + OtelUtil.wrapper().span("RUN TASK " + taskInfo.getTaskType().value(), taskAttributter(taskInfo), + this::runInSpan); + } + + private void runInSpan() { MDC.clear(); var runSingleTask = newRunTaskInstance(); @@ -39,7 +54,7 @@ public void run() { errorCallback = lagErrorCallback(taskInfo, callId, fatal); } catch (Exception e) { errorCallback = lagErrorCallback(taskInfo, callId, e); - } catch (Throwable t) { + } catch (Throwable t) { // NOSONAR errorCallback = lagErrorCallback(taskInfo, callId, t); } finally { clearLogContext(); @@ -102,4 +117,12 @@ RunTask newRunTaskInstance() { return TaskManagerGenerateRunnableTasks.CURRENT.select(RunTask.class).get(); } + public static UnaryOperator taskAttributter(RunTaskInfo taskInfo) { + return spanBuilder -> spanBuilder + .setAttribute("prosesstaskId", taskInfo.getId()) + .setAttribute("prosesstaskType", taskInfo.getTaskType().value()) + .setSpanKind(SpanKind.INTERNAL) + .setNoParent(); + } + } diff --git a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/util/OtelUtil.java b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/util/OtelUtil.java new file mode 100644 index 0000000..3e31a69 --- /dev/null +++ b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/util/OtelUtil.java @@ -0,0 +1,39 @@ +package no.nav.vedtak.felles.prosesstask.impl.util; + +import java.util.function.UnaryOperator; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import no.nav.vedtak.felles.prosesstask.api.ProsessTaskData; +import no.nav.vedtak.log.tracing.OtelSpanWrapper; + +public class OtelUtil { + + private static OtelSpanWrapper WRAPPER = new OtelSpanWrapper(GlobalOpenTelemetry.getTracer("fp-prosesstask")); + + public static OtelSpanWrapper wrapper() { + return WRAPPER; + } + + private OtelUtil() { + // Sonar + } + + public static UnaryOperator taskAttributter(ProsessTaskData data) { + return spanBuilder -> { + var builder = spanBuilder + .setAttribute("prosesstaskId", data.getId()) + .setAttribute("prosesstaskType", data.taskType().value()); + if (data.getSaksnummer() != null) { + builder = builder.setAttribute("saksnummer", data.getSaksnummer()); + } + if (data.getBehandlingUuid() != null) { + builder = builder.setAttribute("behandlingUuid", data.getBehandlingUuid().toString()); + } else if (data.getBehandlingId() != null) { + builder = builder.setAttribute("behandlingId", data.getBehandlingId()); + } + return builder.setSpanKind(SpanKind.INTERNAL); + }; + } +}