Skip to content

Commit

Permalink
Merge pull request cdapio#15357 from cdapio/bugfixes/cdap-20824
Browse files Browse the repository at this point in the history
[CDAP-20824] Emit zero for flow-control launching metric if there is no inflight launching pipeline when appfabric starts
  • Loading branch information
masoud-io authored Oct 16, 2023
2 parents 3302f87 + f9e0935 commit e1d660c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.twill.internal.CompositeService;
Expand Down Expand Up @@ -278,6 +279,7 @@ protected void doStartUp() throws Exception {
RetryStrategies.fromConfiguration(cConf, Constants.Service.RUNTIME_MONITOR_RETRY_PREFIX);
long startTs = System.currentTimeMillis();

AtomicBoolean launching = new AtomicBoolean(false);
Retries.runWithRetries(
() ->
store.scanActiveRuns(
Expand All @@ -288,8 +290,10 @@ protected void doStartUp() throws Exception {
}
try {
if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) {
launching.set(true);
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
} else if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) {
launching.set(true);
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
// It is unknown what is the state of program runs in STARTING state.
// A STARTING message is published again to retry STARTING logic.
Expand All @@ -314,6 +318,10 @@ protected void doStartUp() throws Exception {
}),
retryStrategy,
e -> true);
if (!launching.get()) {
// there is no launching pipeline
runRecordMonitorService.emitLaunchingMetrics(0);
}
}

@Nullable
Expand Down Expand Up @@ -867,7 +875,7 @@ private void processWorkflowOnStop(
}
}

/** write to heart beat table if the recordedRunRecord is not null */
/** write to heart beat table if the recordedRunRecord is not null. */
private void writeToHeartBeatTable(
@Nullable RunRecordDetail recordedRunRecord,
long timestampInSeconds,
Expand Down Expand Up @@ -1024,7 +1032,7 @@ private void publishRecordedStatus(

/**
* Helper method to extract the time from the given properties map, or return -1 if no value was
* found
* found.
*
* @param properties the properties map
* @param option the key to lookup in the properties map
Expand Down Expand Up @@ -1057,7 +1065,7 @@ private BasicThrowable decodeBasicThrowable(@Nullable String encoded) {

/**
* Emit the metrics context for the program, the tags are constructed with the program run id and
* the profile id
* the profile id.
*/
private void emitProfileMetrics(
ProgramRunId programRunId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public void removeRequest(ProgramRunId programRunId, boolean emitRunningChange)
}
}

public void emitLaunchingMetrics(long value) {
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, value);
}

private void emitMetrics(String metricName, long value) {
metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value);
}
Expand Down

0 comments on commit e1d660c

Please sign in to comment.