diff --git a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-deployment/src/main/java/org/jbpm/quarkus/devui/deployment/DevConsoleProcessor.java b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-deployment/src/main/java/org/jbpm/quarkus/devui/deployment/DevConsoleProcessor.java index 03bc33317f2..00e6f44c261 100644 --- a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-deployment/src/main/java/org/jbpm/quarkus/devui/deployment/DevConsoleProcessor.java +++ b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-deployment/src/main/java/org/jbpm/quarkus/devui/deployment/DevConsoleProcessor.java @@ -135,21 +135,21 @@ public CardPageBuildItem pages( .metadata("page", "Processes") .title("Process Instances") .icon("font-awesome-solid:diagram-project") - .dynamicLabelJsonRPCMethodName("queryProcessInstancesCount")); + .streamingLabelJsonRPCMethodName("queryProcessInstancesCount")); cardPageBuildItem.addPage(Page.webComponentPageBuilder() .componentLink("qwc-jbpm-quarkus-devui.js") .metadata("page", "Tasks") .title("Tasks") .icon("font-awesome-solid:bars-progress") - .dynamicLabelJsonRPCMethodName("queryTasksCount")); + .streamingLabelJsonRPCMethodName("queryTasksCount")); cardPageBuildItem.addPage(Page.webComponentPageBuilder() .componentLink("qwc-jbpm-quarkus-devui.js") .metadata("page", "Jobs") .title("Jobs") .icon("font-awesome-solid:clock") - .dynamicLabelJsonRPCMethodName("queryJobsCount")); + .streamingLabelJsonRPCMethodName("queryJobsCount")); cardPageBuildItem.addPage(Page.webComponentPageBuilder() .componentLink("qwc-jbpm-quarkus-devui.js") diff --git a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/pom.xml b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/pom.xml index d56dbb280f9..dc2fe91058e 100644 --- a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/pom.xml +++ b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/pom.xml @@ -95,6 +95,11 @@ vertx-web-client + + org.kie.kogito + kogito-api + + org.junit.jupiter junit-jupiter-engine diff --git a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/DataIndexCounter.java b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/DataIndexCounter.java new file mode 100644 index 00000000000..e378ada468c --- /dev/null +++ b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/DataIndexCounter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jbpm.quarkus.devui.runtime.rpc; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.operators.multi.MultiCacheOp; +import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.WebClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataIndexCounter { + private static final Logger LOGGER = LoggerFactory.getLogger(DataIndexCounter.class); + + private final Vertx vertx; + private final MultiCacheOp multi; + private final WebClient dataIndexWebClient; + private final String path; + + private final String query; + private final String field; + + public DataIndexCounter(String query, String graphField, String path, Vertx vertx, WebClient dataIndexWebClient) { + if (dataIndexWebClient == null) { + throw new IllegalArgumentException("dataIndexWebClient is null"); + } + this.query = query; + this.field = graphField; + this.path = path; + + this.vertx = vertx; + this.dataIndexWebClient = dataIndexWebClient; + + this.multi = new MultiCacheOp<>(BroadcastProcessor.create()); + + refreshCount(); + } + + public void refresh() { + vertx.setTimer(1000, id -> { + refreshCount(); + }); + } + + public void stop() { + multi.onComplete(); + } + + private void refreshCount() { + LOGGER.debug("Refreshing data for query: {}", query); + + this.dataIndexWebClient.post(path + "/graphql") + .putHeader("content-type", "application/json") + .sendJson(new JsonObject(query)) + .map(response -> { + if (response.statusCode() == 200) { + JsonObject responseData = response.bodyAsJsonObject().getJsonObject("data"); + return String.valueOf(responseData.getJsonArray(field).size()); + } + return "0"; + }) + .onComplete(count -> this.multi.onNext(count.result())); + } + + public Multi getMulti() { + return multi; + } +} \ No newline at end of file diff --git a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevUIEventPublisher.java b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevUIEventPublisher.java new file mode 100644 index 00000000000..399274f3540 --- /dev/null +++ b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevUIEventPublisher.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.jbpm.quarkus.devui.runtime.rpc; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Default; + +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.quarkus.arc.profile.IfBuildProfile; + +import java.util.Collection; +import java.util.Objects; +@ApplicationScoped +@IfBuildProfile("dev") +public class JBPMDevUIEventPublisher implements EventPublisher { + + private Runnable onProcessEvent; + private Runnable onTaskEvent; + private Runnable onJobEvent; + + @Override + public void publish(DataEvent event) { + switch (event.getType()) { + case "ProcessInstanceStateDataEvent": + maybeRun(onProcessEvent); + break; + case "UserTaskInstanceStateDataEvent": + maybeRun(onTaskEvent); + break; + case "JobEvent": + maybeRun(onJobEvent); + break; + } + } + + @Override + public void publish(Collection> events) { + events.forEach(this::publish); + } + + private void maybeRun(Runnable runnable) { + if (Objects.nonNull(runnable)) { + runnable.run(); + } + } + + public void setOnProcessEventListener(Runnable onProcessEvent) { + this.onProcessEvent = onProcessEvent; + } + + public void setOnTaskEventListener(Runnable onTaskEvent) { + this.onTaskEvent = onTaskEvent; + } + + public void setOnJobEventListener(Runnable onJobEvent) { + this.onJobEvent = onJobEvent; + } +} \ No newline at end of file diff --git a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevuiJsonRPCService.java b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevuiJsonRPCService.java index 2e62a776691..287e5629549 100644 --- a/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevuiJsonRPCService.java +++ b/packages/jbpm-quarkus-devui/jbpm-quarkus-devui-runtime/src/main/java/org/jbpm/quarkus/devui/runtime/rpc/JBPMDevuiJsonRPCService.java @@ -24,11 +24,14 @@ import java.util.Optional; import io.smallrye.mutiny.Uni; +import io.quarkus.arc.profile.IfBuildProfile; +import io.smallrye.mutiny.Multi; import io.vertx.core.Vertx; -import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.client.WebClientOptions; import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + import org.eclipse.microprofile.config.ConfigProvider; import org.jbpm.quarkus.devui.runtime.forms.FormsStorage; @@ -38,6 +41,7 @@ import org.slf4j.LoggerFactory; @ApplicationScoped +@IfBuildProfile("dev") public class JBPMDevuiJsonRPCService { private static final String DATA_INDEX_URL = "kogito.data-index.url"; @@ -54,11 +58,16 @@ public class JBPMDevuiJsonRPCService { private WebClient dataIndexWebClient; private final Vertx vertx; + private final JBPMDevUIEventPublisher eventPublisher; private final FormsStorage formsStorage; + private DataIndexCounter processesCounter; + private DataIndexCounter tasksCounter; + private DataIndexCounter jobsCounter; @Inject - public JBPMDevuiJsonRPCService(Vertx vertx, FormsStorage formsStorage) { + public JBPMDevuiJsonRPCService(Vertx vertx, JBPMDevUIEventPublisher eventPublisher, FormsStorage formsStorage) { this.vertx = vertx; + this.eventPublisher = eventPublisher; this.formsStorage = formsStorage; } @@ -70,50 +79,50 @@ public void init() { private void initDataIndexWebClient(String dataIndexURL) { try { - this.dataIndexWebClient = WebClient.create(vertx, buildWebClientOptions(dataIndexURL)); + URL url = new URL(dataIndexURL); + this.dataIndexWebClient = WebClient.create(vertx, buildWebClientOptions(url)); + + String contextPath = url.getPath(); + this.processesCounter = new DataIndexCounter(ALL_PROCESS_INSTANCES_IDS_QUERY, PROCESS_INSTANCES, + contextPath, vertx, dataIndexWebClient); + this.tasksCounter = new DataIndexCounter(ALL_TASKS_IDS_QUERY, USER_TASKS, contextPath, vertx, dataIndexWebClient); + this.jobsCounter = new DataIndexCounter(ALL_JOBS_IDS_QUERY, JOBS, contextPath, vertx, dataIndexWebClient); + + this.eventPublisher.setOnProcessEventListener(processesCounter::refresh); + this.eventPublisher.setOnTaskEventListener(tasksCounter::refresh); + this.eventPublisher.setOnJobEventListener(jobsCounter::refresh); } catch (Exception ex) { LOGGER.warn("Cannot configure dataIndexWebClient with 'kogito.data-index.url'='{}':", dataIndexURL, ex); } } - protected WebClientOptions buildWebClientOptions(String dataIndexURL) throws MalformedURLException { - URL url = new URL(dataIndexURL); + protected WebClientOptions buildWebClientOptions(URL dataIndexURL) throws MalformedURLException { return new WebClientOptions() - .setDefaultHost(url.getHost()) - .setDefaultPort((url.getPort() != -1 ? url.getPort() : url.getDefaultPort())) - .setSsl(url.getProtocol().compareToIgnoreCase("https") == 0); - } - - public Uni queryProcessInstancesCount() { - return doQuery(ALL_PROCESS_INSTANCES_IDS_QUERY, PROCESS_INSTANCES); + .setDefaultHost(dataIndexURL.getHost()) + .setDefaultPort((dataIndexURL.getPort() != -1 ? dataIndexURL.getPort() : dataIndexURL.getDefaultPort())) + .setSsl(dataIndexURL.getProtocol().compareToIgnoreCase("https") == 0); } - public Uni queryTasksCount() { - return doQuery(ALL_TASKS_IDS_QUERY, USER_TASKS); + public Multi queryProcessInstancesCount() { + return processesCounter.getMulti(); } - public Uni queryJobsCount() { - return doQuery(ALL_JOBS_IDS_QUERY, JOBS); + public Multi queryTasksCount() { + return tasksCounter.getMulti(); } - private Uni doQuery(String query, String graphModelName) { - if(dataIndexWebClient == null) { - LOGGER.warn("Cannot perform '{}' query, dataIndexWebClient couldn't be set. Is DataIndex correctly? Please verify '{}' value", graphModelName, DATA_INDEX_URL); - return Uni.createFrom().item("-"); - } - return Uni.createFrom().completionStage(this.dataIndexWebClient.post("/graphql") - .putHeader("content-type", "application/json") - .sendJson(new JsonObject(query)) - .map(response -> { - if(response.statusCode() == 200) { - JsonObject responseData = response.bodyAsJsonObject().getJsonObject("data"); - return String.valueOf(responseData.getJsonArray(graphModelName).size()); - } - return "-"; - }).toCompletionStage()); + public Multi queryJobsCount() { + return jobsCounter.getMulti(); } public Uni getFormsCount() { return Uni.createFrom().item(String.valueOf(this.formsStorage.getFormsCount())); } -} + + @PreDestroy + public void destroy() { + processesCounter.stop(); + tasksCounter.stop(); + jobsCounter.stop(); + } +} \ No newline at end of file