Skip to content

Commit

Permalink
feat: basis opentelemetry støtte i for prosesstask (#746)
Browse files Browse the repository at this point in the history
* feat: basis opentelemetry støtte i for prosesstask.

* Review

* Rydder versjonering
  • Loading branch information
mrsladek authored Dec 10, 2024
1 parent 443fe4a commit a4fa69f
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<sonar.projectName>fp-prosesstask</sonar.projectName>
<sonar.projectKey>navikt_fp-prosesstask</sonar.projectKey>

<felles.version>7.4.0</felles.version>
<felles.version>7.4.3</felles.version>

<jakarta.jakartaee-bom.version>10.0.0</jakarta.jakartaee-bom.version>
<hibernate-core.version>6.6.3.Final</hibernate-core.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected ProsessTaskHandler getBean() {
private ProsessTask getProsessTaskAnnotation() {
Class<?> clazz = getTargetClassExpectingAnnotation(ProsessTask.class);
if (!clazz.isAnnotationPresent(ProsessTask.class)) {
throw new IllegalStateException(clazz.getSimpleName() + " mangler annotering @ProsesTask");
throw new IllegalStateException(clazz.getSimpleName() + " mangler annotering @ProsesTask");
}
return clazz.getAnnotation(ProsessTask.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import jakarta.persistence.PersistenceException;
import jakarta.transaction.Transactional;

import no.nav.vedtak.felles.prosesstask.impl.util.OtelUtil;

import org.hibernate.Session;
import org.hibernate.exception.JDBCConnectionException;
import org.hibernate.jdbc.Work;
Expand Down Expand Up @@ -91,10 +93,12 @@ protected void runTaskAndUpdateStatus(Connection conn, ProsessTaskEntitet pte, P
return;
}

pickAndRun.dispatchWork(pte);
OtelUtil.wrapper().span("TASK " + taskType.value(), OtelUtil.taskAttributter(pte.tilProsessTask()), () -> {
pickAndRun.dispatchWork(pte);
// flush for å fange andre constraint feil etc før vi markerer ferdig
getEntityManager().flush();
});

// flush for å fange andre constraint feil etc før vi markerer ferdig
getEntityManager().flush();

if (ProsessTaskStatus.KLAR == pte.getStatus()) {
var sluttStatus = pickAndRun.markerTaskFerdig(pte);
Expand Down Expand Up @@ -203,7 +207,7 @@ void handleTaskFeil(ProsessTaskEntitet pte, Exception e) {
feilOgStatushåndterer.handleTaskFeil(retryPolicy, pte, e);
}

void handleFatalTaskFeil(ProsessTaskEntitet pte,Feil feil, Exception e) {
void handleFatalTaskFeil(ProsessTaskEntitet pte, Feil feil, Exception e) {
feilOgStatushåndterer.handleFatalTaskFeil(pte, feil, e);
}

Expand All @@ -219,7 +223,7 @@ ProsessTaskStatus markerTaskFerdig(ProsessTaskEntitet pte) {
// frigir veto etter at event handlere er fyrt
vetoHåndterer.frigiVeto(pte);

ProsessTaskStatus nyStatus = ProsessTaskStatus.KJOERT;
var nyStatus = ProsessTaskStatus.KJOERT;
taskManagerRepository.oppdaterStatus(pte.getId(), nyStatus);

pte = refreshProsessTask(pte.getId());
Expand All @@ -230,7 +234,7 @@ ProsessTaskStatus markerTaskFerdig(ProsessTaskEntitet pte) {
// markerer task som påbegynt (merk committer ikke før til slutt).
void markerTaskUnderArbeid(ProsessTaskEntitet pte) {
// mark row being processed with timestamp and server process id
LocalDateTime now = LocalDateTime.now();
var now = LocalDateTime.now();
pte.setSisteKjøring(now);
pte.setSisteKjøringServer(getJvmUniqueProcessName());
getEntityManager().persist(pte);
Expand All @@ -240,15 +244,15 @@ void markerTaskUnderArbeid(ProsessTaskEntitet pte) {
// regner ut neste kjøretid for tasks som kan repeteres (har CronExpression)
void planleggNesteKjøring(ProsessTaskEntitet pte) throws SQLException {
if (cronExpression != null) {
String gruppe = ProsessTaskRepository.getUniktProsessTaskGruppeNavn(taskManagerRepository.getEntityManager());
LocalDateTime now = LocalDateTime.now();
LocalDateTime nesteKjøring = cronExpression.nextLocalDateTimeAfter(now);
var gruppe = ProsessTaskRepository.getUniktProsessTaskGruppeNavn(taskManagerRepository.getEntityManager());
var now = LocalDateTime.now();
var nesteKjøring = cronExpression.nextLocalDateTimeAfter(now);
var data = ProsessTaskDataBuilder.forTaskType(pte.getTaskType())
.medNesteKjøringEtter(nesteKjøring)
.medProperties(pte.getProperties())
.medGruppe(gruppe)
.medSekvens(pte.getSekvens());
ProsessTaskEntitet nyPte = new ProsessTaskEntitet().kopierFraNy(data.build());
var nyPte = new ProsessTaskEntitet().kopierFraNy(data.build());

getEntityManager().persist(nyPte);
getEntityManager().flush();
Expand All @@ -266,7 +270,7 @@ void markerTaskUnderArbeid(ProsessTaskEntitet pte) {
}

void dispatchWork(ProsessTaskEntitet pte) throws Exception {
ProsessTaskData taskData = pte.tilProsessTask();
var taskData = pte.tilProsessTask();
taskInfo.getTaskDispatcher().dispatch(taskData);
}

Expand All @@ -277,7 +281,7 @@ private EntityManager getEntityManager() {
@SuppressWarnings("rawtypes")
void runTask() {

final PickAndRunTask pickAndRun = this;
final var pickAndRun = this;
/*
* Bruker SQL+JDBC for å kunne benytte savepoints og inkrementell oppdatering i
* transaksjonen.
Expand All @@ -286,7 +290,7 @@ class PullSingleTask implements Work {
@Override
public void execute(Connection conn) throws SQLException {
try {
Optional<ProsessTaskEntitet> pte = taskManagerRepository.finnOgLås(taskInfo);
var pte = taskManagerRepository.finnOgLås(taskInfo);
if (pte.isPresent()) {
runTaskAndUpdateStatus(conn, pte.get(), pickAndRun);
}
Expand All @@ -304,15 +308,15 @@ public void execute(Connection conn) throws SQLException {

}

PullSingleTask pullSingleTask = new PullSingleTask();
EntityManager em = getEntityManager();
var pullSingleTask = new PullSingleTask();
var em = getEntityManager();
// workaround for hibernate issue HHH-11020
if (em instanceof TargetInstanceProxy tip) {
em = (EntityManager) tip.weld_getTargetInstance();
}

@SuppressWarnings("resource") // skal ikke lukke session her
Session session = em.unwrap(Session.class);
var session = em.unwrap(Session.class);

session.doWork(pullSingleTask);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,24 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.hibernate.exception.JDBCConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.opentelemetry.api.trace.SpanKind;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.CDI;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;

import org.hibernate.exception.JDBCConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import no.nav.vedtak.exception.TekniskException;
import no.nav.vedtak.felles.jpa.TransactionHandler;
import no.nav.vedtak.felles.prosesstask.api.ProsessTaskData;
import no.nav.vedtak.felles.prosesstask.api.ProsessTaskDispatcher;
import no.nav.vedtak.felles.prosesstask.api.TaskMonitor;
import no.nav.vedtak.felles.prosesstask.api.TaskType;
import no.nav.vedtak.felles.prosesstask.impl.util.OtelUtil;
import no.nav.vedtak.log.metrics.Controllable;

/**
Expand Down Expand Up @@ -363,7 +364,8 @@ protected List<IdentRunnable> doWork(EntityManager entityManager) {
*/
@Override
public Integer call() {
return RequestContextHandler.doWithRequestContext(this::doPollingWithEntityManager);
return OtelUtil.wrapper().span("POLL_TASKS", spanBuilder -> spanBuilder.setSpanKind(SpanKind.INTERNAL).setNoParent(),
() -> RequestContextHandler.doWithRequestContext(this::doPollingWithEntityManager));
}

public Integer doPollingWithEntityManager() {
Expand All @@ -389,7 +391,7 @@ public Integer doPollingWithEntityManager() {
} catch (Exception e) {
backoffRound.set(backoffInterval.length - 1); // force max delay (skal kun havne her for Exception/RuntimeException)
LOG.warn("PT-996896 Kunne ikke polle database, venter til neste runde(runde={})", backoffRound.get(), e);
} catch (Throwable t) {
} catch (Throwable t) { // NOSONAR
backoffRound.set(backoffInterval.length - 1); // force max delay (skal kun havne her for Error)
LOG.error("PT-996897 Kunne ikke polle grunnet kritisk feil, venter ({}s)", getBackoffIntervalSeconds(), t);
}
Expand Down Expand Up @@ -510,13 +512,14 @@ public Integer doWithContext() {

@Override
public void run() {
RequestContextHandler.doWithRequestContext(this::doWithContext);
// neste kjører mellom 1-10 min fra nå.
var min = 60L * 1000;
var delay = System.currentTimeMillis() % (9 * min);
pollingService.schedule(this, min + delay, TimeUnit.MILLISECONDS);
OtelUtil.wrapper().span("MoveToDonePartition", spanBuilder -> spanBuilder.setSpanKind(SpanKind.INTERNAL).setNoParent(), () -> {
RequestContextHandler.doWithRequestContext(this::doWithContext);
// neste kjører mellom 1-10 min fra nå.
var min = 60L * 1000;
var delay = System.currentTimeMillis() % (9 * min);
pollingService.schedule(this, min + delay, TimeUnit.MILLISECONDS);
});
}

}

/**
Expand Down Expand Up @@ -561,7 +564,8 @@ public Integer doWithContext() {

@Override
public void run() {
RequestContextHandler.doWithRequestContext(this::doWithContext);
OtelUtil.wrapper().span("FreeBlockedTasks", spanBuilder -> spanBuilder.setSpanKind(SpanKind.INTERNAL).setNoParent(),
() -> RequestContextHandler.doWithRequestContext(this::doWithContext));
}

}
Expand Down Expand Up @@ -609,13 +613,14 @@ public Integer doWithContext() {

@Override
public void run() {
RequestContextHandler.doWithRequestContext(this::doWithContext);
// neste kjører mellom 3-9 min fra nå.
var min = 3L * 60 * 1000;
var delay = System.currentTimeMillis() % (2 * min);
pollingService.schedule(this, min + delay, TimeUnit.MILLISECONDS);
OtelUtil.wrapper().span("UpdateTaskMonitor", spanBuilder -> spanBuilder.setSpanKind(SpanKind.INTERNAL), () -> {
RequestContextHandler.doWithRequestContext(this::doWithContext);
// neste kjører mellom 3-9 min fra nå.
var min = 3L * 60 * 1000;
var delay = System.currentTimeMillis() % (2 * min);
pollingService.schedule(this, min + delay, TimeUnit.MILLISECONDS);
});
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@

import java.time.LocalDateTime;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import no.nav.vedtak.felles.prosesstask.impl.util.OtelUtil;

import org.slf4j.MDC;

import jakarta.persistence.PersistenceException;
import no.nav.vedtak.felles.prosesstask.api.CallId;
import no.nav.vedtak.felles.prosesstask.api.TaskType;

import static no.nav.vedtak.felles.prosesstask.impl.util.OtelUtil.taskAttributter;

class TaskManagerRunnableTask implements Runnable {
private final TaskType taskType;
private final RunTaskInfo taskInfo;
Expand All @@ -24,6 +34,11 @@ class TaskManagerRunnableTask implements Runnable {

@Override
public void run() {
OtelUtil.wrapper().span("RUN TASK " + taskInfo.getTaskType().value(), taskAttributter(taskInfo),
this::runInSpan);
}

private void runInSpan() {
MDC.clear();

var runSingleTask = newRunTaskInstance();
Expand All @@ -39,7 +54,7 @@ public void run() {
errorCallback = lagErrorCallback(taskInfo, callId, fatal);
} catch (Exception e) {
errorCallback = lagErrorCallback(taskInfo, callId, e);
} catch (Throwable t) {
} catch (Throwable t) { // NOSONAR
errorCallback = lagErrorCallback(taskInfo, callId, t);
} finally {
clearLogContext();
Expand Down Expand Up @@ -102,4 +117,12 @@ RunTask newRunTaskInstance() {
return TaskManagerGenerateRunnableTasks.CURRENT.select(RunTask.class).get();
}

public static UnaryOperator<SpanBuilder> taskAttributter(RunTaskInfo taskInfo) {
return spanBuilder -> spanBuilder
.setAttribute("prosesstaskId", taskInfo.getId())
.setAttribute("prosesstaskType", taskInfo.getTaskType().value())
.setSpanKind(SpanKind.INTERNAL)
.setNoParent();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package no.nav.vedtak.felles.prosesstask.impl.util;

import java.util.function.UnaryOperator;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import no.nav.vedtak.felles.prosesstask.api.ProsessTaskData;
import no.nav.vedtak.log.tracing.OtelSpanWrapper;

public class OtelUtil {

private static OtelSpanWrapper WRAPPER = new OtelSpanWrapper(GlobalOpenTelemetry.getTracer("fp-prosesstask"));

public static OtelSpanWrapper wrapper() {
return WRAPPER;
}

private OtelUtil() {
// Sonar
}

public static UnaryOperator<SpanBuilder> taskAttributter(ProsessTaskData data) {
return spanBuilder -> {
var builder = spanBuilder
.setAttribute("prosesstaskId", data.getId())
.setAttribute("prosesstaskType", data.taskType().value());
if (data.getSaksnummer() != null) {
builder = builder.setAttribute("saksnummer", data.getSaksnummer());
}
if (data.getBehandlingUuid() != null) {
builder = builder.setAttribute("behandlingUuid", data.getBehandlingUuid().toString());
} else if (data.getBehandlingId() != null) {
builder = builder.setAttribute("behandlingId", data.getBehandlingId());
}
return builder.setSpanKind(SpanKind.INTERNAL);
};
}
}

0 comments on commit a4fa69f

Please sign in to comment.