-
-
this.setParameterProperty("nbthreads", event.target.value)}/>
+
+
+
+
+
+
+
Worker
+
+
+
+ this.setParameterProperty("maxjobsactive", event.target.value)}/>
+
+
+
+
+ this.setParameterProperty("nbthreads", event.target.value)}/>
+
+
+
+
+
)
diff --git a/src/main/frontend/src/store/Store.jsx b/src/main/frontend/src/store/Store.jsx
index 41cfa3d..4d4b4d3 100644
--- a/src/main/frontend/src/store/Store.jsx
+++ b/src/main/frontend/src/store/Store.jsx
@@ -58,7 +58,6 @@ class Store extends React.Component {
Connector |
Release |
-
Status |
Operation |
@@ -80,8 +79,7 @@ class Store extends React.Component {
{(connectorStore.status === "NEW" || connectorStore.status === "OLD") &&
diff --git a/src/main/java/io/camunda/cherry/admin/AdminRestController.java b/src/main/java/io/camunda/cherry/admin/AdminRestController.java
index 4a27588..82c6e3e 100644
--- a/src/main/java/io/camunda/cherry/admin/AdminRestController.java
+++ b/src/main/java/io/camunda/cherry/admin/AdminRestController.java
@@ -8,6 +8,7 @@
/* ******************************************************************** */
package io.camunda.cherry.admin;
+import io.camunda.cherry.db.entity.JarStorageEntity;
import io.camunda.cherry.definition.AbstractRunner;
import io.camunda.cherry.runner.JobRunnerFactory;
import io.camunda.cherry.runtime.HistoryFactory;
@@ -21,6 +22,11 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,21 +37,31 @@ public class AdminRestController {
Logger logger = LoggerFactory.getLogger(AdminRestController.class.getName());
- @Autowired
- JobRunnerFactory cherryJobRunnerFactory;
+ private final JobRunnerFactory jobRunnerFactory;
- @Autowired
- HistoryFactory historyFactory;
+ private final HistoryFactory historyFactory;
- @Autowired
- ZeebeConfiguration zeebeConfiguration;
+ private final ZeebeConfiguration zeebeConfiguration;
/**
* Spring populate the list of all workers
*/
- @Autowired
- private List listRunner;
-
+ private final List listRunner;
+
+ private final DataSource dataSource;
+
+ AdminRestController( JobRunnerFactory jobRunnerFactory,
+ HistoryFactory historyFactory,
+ ZeebeConfiguration zeebeConfiguration,
+List listRunner,
+ DataSource dataSource
+) {
+this.jobRunnerFactory =jobRunnerFactory;
+this.historyFactory=historyFactory;
+ this.zeebeConfiguration=zeebeConfiguration;
+ this.listRunner = listRunner;
+ this.dataSource = dataSource;
+ }
@GetMapping(value = "/api/ping", produces = "application/json")
public Map ping() {
Map parameters = new HashMap<>();
@@ -58,7 +74,8 @@ public Map getParameters() {
Map parameters = new HashMap<>();
parameters.put("zeebekindconnection", zeebeConfiguration.isCloudConfiguration() ? "SAAS" : "GATEWAY");
parameters.put("gatewayaddress", zeebeConfiguration.getGatewayAddress());
- parameters.put("plaintext", zeebeConfiguration.isPlaintext()==null? null : zeebeConfiguration.isPlaintext().toString());
+ parameters.put("plaintext",
+ zeebeConfiguration.isPlaintext() == null ? null : zeebeConfiguration.isPlaintext().toString());
parameters.put("cloudregion", zeebeConfiguration.getRegion());
parameters.put("cloudclusterid", zeebeConfiguration.getClusterId());
@@ -66,19 +83,26 @@ public Map getParameters() {
parameters.put("cloudclientsecret", ""); // never send the client Secret
// we don't want the configuration here, but the running information
- parameters.put("maxjobsactive", cherryJobRunnerFactory.getMaxJobActive());
- parameters.put("nbthreads", cherryJobRunnerFactory.getNumberOfThreads());
+ parameters.put("maxjobsactive", jobRunnerFactory.getMaxJobActive());
+ parameters.put("nbthreads", jobRunnerFactory.getNumberOfThreads());
+
+ try(Connection con = dataSource.getConnection()) {
+ parameters.put("datasourceproductname",con.getMetaData().getDatabaseProductName());
+ parameters.put("datasourceurl",con.getMetaData().getURL());
+ parameters.put("datasourceusername",con.getMetaData().getUserName());
+
+ } catch(Exception e){}
return parameters;
}
@GetMapping(value = "/api/runtime/threads", produces = "application/json")
public Integer getNumberOfThreads() {
- return cherryJobRunnerFactory.getNumberOfThreads();
+ return jobRunnerFactory.getNumberOfThreads();
}
@PutMapping(value = "/api/runtime/setthreads", produces = "application/json")
public void setNumberOfThread(@RequestParam(name = "threads") Integer numberOfThreads) {
- cherryJobRunnerFactory.setNumberOfThreads(numberOfThreads);
+ jobRunnerFactory.setNumberOfThreads(numberOfThreads);
}
}
diff --git a/src/main/java/io/camunda/cherry/db/entity/OperationEntity.java b/src/main/java/io/camunda/cherry/db/entity/OperationEntity.java
index d854522..4db481e 100644
--- a/src/main/java/io/camunda/cherry/db/entity/OperationEntity.java
+++ b/src/main/java/io/camunda/cherry/db/entity/OperationEntity.java
@@ -42,6 +42,6 @@ public class OperationEntity {
private Long id;
public enum Operation {
- HOSTNAME, STARTRUNNER, STOPRUNNER, SETTHRESOLD, STOPRUNTIME, STARTRUNTIME, SERVERINFO, ERROR
+ HOSTNAME, STARTRUNNER, STOPRUNNER, SETTHRESOLD, STOPRUNTIME, STARTRUNTIME, SERVERINFO, ERROR, REMOVE
}
}
diff --git a/src/main/java/io/camunda/cherry/db/entity/RunnerDefinitionEntity.java b/src/main/java/io/camunda/cherry/db/entity/RunnerDefinitionEntity.java
index 703df24..3ae3c51 100644
--- a/src/main/java/io/camunda/cherry/db/entity/RunnerDefinitionEntity.java
+++ b/src/main/java/io/camunda/cherry/db/entity/RunnerDefinitionEntity.java
@@ -20,13 +20,13 @@ public class RunnerDefinitionEntity {
@Column(name = "name", length = 300)
public String name;
- @Column(name = "classname", length = 1000, unique = true)
+ @Column(name = "classname", length = 1000)
public String classname;
@Column(name = "collection", length = 300)
public String collectionName;
- @Column(name = "type", length = 1000)
+ @Column(name = "type", length = 1000, unique = true)
public String type;
@Column(name = "origin", length = 1000)
diff --git a/src/main/java/io/camunda/cherry/db/entity/RunnerExecutionEntity.java b/src/main/java/io/camunda/cherry/db/entity/RunnerExecutionEntity.java
index 9935526..37bd086 100644
--- a/src/main/java/io/camunda/cherry/db/entity/RunnerExecutionEntity.java
+++ b/src/main/java/io/camunda/cherry/db/entity/RunnerExecutionEntity.java
@@ -21,7 +21,7 @@ public class RunnerExecutionEntity {
@Enumerated(EnumType.STRING)
public TypeExecutor typeExecutor;
- @Column(name = "runner_type", length = 100)
+ @Column(name = "runner_type", length = 1000)
public String runnerType;
/**
diff --git a/src/main/java/io/camunda/cherry/db/repository/RunnerDefinitionRepository.java b/src/main/java/io/camunda/cherry/db/repository/RunnerDefinitionRepository.java
index 3aacd15..19735d7 100644
--- a/src/main/java/io/camunda/cherry/db/repository/RunnerDefinitionRepository.java
+++ b/src/main/java/io/camunda/cherry/db/repository/RunnerDefinitionRepository.java
@@ -13,6 +13,14 @@ public interface RunnerDefinitionRepository extends JpaRepository selectNotInType(@Param("listTypes") List listTypes);
+
@Query("select runnerDefinition from RunnerDefinitionEntity runnerDefinition"
+ " where runnerDefinition.jar is not null")
List selectAllByJarNotNull();
diff --git a/src/main/java/io/camunda/cherry/db/repository/RunnerExecutionRepository.java b/src/main/java/io/camunda/cherry/db/repository/RunnerExecutionRepository.java
index 2360df0..b40b987 100644
--- a/src/main/java/io/camunda/cherry/db/repository/RunnerExecutionRepository.java
+++ b/src/main/java/io/camunda/cherry/db/repository/RunnerExecutionRepository.java
@@ -4,14 +4,17 @@
import io.camunda.cherry.definition.AbstractRunner;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
+import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
+@Transactional
public interface RunnerExecutionRepository extends JpaRepository {
@Query("select runnerexecution from RunnerExecutionEntity runnerexecution"
@@ -39,4 +42,10 @@ List selectRunnerRecordsByStates(@Param("runnerType") Str
@Param("dateToSearch") LocalDateTime dateToSearch,
@Param("listStates") List listStates,
Pageable pageable);
+
+ @Modifying
+ @Query(value = "delete from RunnerExecutionEntity runnerexecution"
+ + " where runnerexecution.runnerType = :runnerType")
+ void deleteFromEntityType(@Param("runnerType") String runnerType);
+
}
diff --git a/src/main/java/io/camunda/cherry/definition/AbstractWorker.java b/src/main/java/io/camunda/cherry/definition/AbstractWorker.java
index b01757a..e27635e 100644
--- a/src/main/java/io/camunda/cherry/definition/AbstractWorker.java
+++ b/src/main/java/io/camunda/cherry/definition/AbstractWorker.java
@@ -101,8 +101,11 @@ public void handle(final JobClient jobClient, final ActivatedJob activatedJob) {
errorCode = "Exception";
errorMessage = e.getMessage();
}
- // save the output in the process instance
- jobClient.newCompleteCommand(activatedJob.getKey()).variables(contextExecution.outVariablesValue).send().join();
+ if(ExecutionStatusEnum.FAIL.equals(status) || ExecutionStatusEnum.BPMNERROR.equals(status))
+ jobClient.newThrowErrorCommand(activatedJob.getKey()).errorCode(errorCode).errorMessage(errorMessage).send().join();
+ else
+ // save the output in the process instance
+ jobClient.newCompleteCommand(activatedJob.getKey()).variables(contextExecution.outVariablesValue).send().join();
contextExecution.endExecution = System.currentTimeMillis();
if (isLog())
diff --git a/src/main/java/io/camunda/cherry/embeddedrunner/ping/connector/PingConnector.java b/src/main/java/io/camunda/cherry/embeddedrunner/ping/connector/PingConnector.java
index 0503ec8..dbf6c78 100644
--- a/src/main/java/io/camunda/cherry/embeddedrunner/ping/connector/PingConnector.java
+++ b/src/main/java/io/camunda/cherry/embeddedrunner/ping/connector/PingConnector.java
@@ -24,11 +24,9 @@
/* ------------------------------------------------------------------- */
@Component
-@OutboundConnector(name = PingConnector.TYPE_PINGCONNECTOR,
- inputVariables = { PingConnectorInput.INPUT_MESSAGE,
+@OutboundConnector(name = PingConnector.TYPE_PINGCONNECTOR, inputVariables = { PingConnectorInput.INPUT_MESSAGE,
PingConnectorInput.INPUT_DELAY,
- PingConnectorInput.INPUT_THROWERRORPLEASE },
- type = PingConnector.TYPE_PINGCONNECTOR)
+ PingConnectorInput.INPUT_THROWERRORPLEASE }, type = PingConnector.TYPE_PINGCONNECTOR)
public class PingConnector extends AbstractConnector implements IntFrameworkRunner, OutboundConnectorFunction {
public static final String ERROR_BAD_WEATHER = "BAD_WEATHER";
diff --git a/src/main/java/io/camunda/cherry/runner/JobRunnerFactory.java b/src/main/java/io/camunda/cherry/runner/JobRunnerFactory.java
index 1c4b84f..e430160 100644
--- a/src/main/java/io/camunda/cherry/runner/JobRunnerFactory.java
+++ b/src/main/java/io/camunda/cherry/runner/JobRunnerFactory.java
@@ -10,7 +10,6 @@
import io.camunda.cherry.definition.AbstractConnector;
import io.camunda.cherry.definition.AbstractRunner;
import io.camunda.cherry.definition.AbstractWorker;
-
import io.camunda.cherry.definition.CherryConnectorJobHandler;
import io.camunda.cherry.definition.SdkRunnerConnector;
import io.camunda.cherry.exception.OperationAlreadyStartedException;
@@ -79,7 +78,7 @@ public void startAll() {
// now start the Zeebe Client
try {
zeebeContainer.startZeebeeClient();
- } catch(TechnicalException e) {
+ } catch (TechnicalException e) {
logger.error("ZeebeClient is not started, can't start runner");
return;
}
@@ -189,11 +188,13 @@ public boolean isRunnerActive(String runnerType) throws OperationException {
/**
* We ask the container what is the number of job active configured
+ *
* @return number of job active
*/
public int getMaxJobActive() {
return zeebeContainer.getMaxJobsActive();
}
+
public int getNumberOfThreads() {
return zeebeContainer.getNumberOfThreads();
}
@@ -203,7 +204,6 @@ public void setNumberOfThreads(int numberOfThreadsRequired) throws TechnicalExce
zeebeContainer.stopZeebeeClient();
zeebeContainer.startZeebeeClient();
-
// stop all running and restart them
for (Running running : mapRunning.values()) {
closeJobWorker(running.containerJobWorker.getJobWorker());
diff --git a/src/main/java/io/camunda/cherry/runner/LogOperation.java b/src/main/java/io/camunda/cherry/runner/LogOperation.java
index e10dac9..a0ac77b 100644
--- a/src/main/java/io/camunda/cherry/runner/LogOperation.java
+++ b/src/main/java/io/camunda/cherry/runner/LogOperation.java
@@ -34,7 +34,7 @@ public class LogOperation {
* @param message message
*/
public void log(OperationEntity.Operation operation, String message) {
- logger.info("Operation {} [{}]",operation.toString(),message);
+ logger.info("Operation {} [{}]", operation.toString(), message);
OperationEntity operationEntity = new OperationEntity();
operationEntity.operation = operation;
operationEntity.executionTime = LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC);
@@ -108,7 +108,7 @@ public void logException(String runnerType, String message, Exception ex) {
* @param e exception
*/
public void logError(String message, Exception e) {
- logger.error("Exception {} {}", message, e.getMessage());
+ logger.error("Exception {} {}", message, e);
OperationEntity operationEntity = new OperationEntity();
operationEntity.executionTime = LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC);
operationEntity.operation = OperationEntity.Operation.ERROR;
@@ -116,13 +116,14 @@ public void logError(String message, Exception e) {
operationEntity.message = message + ": " + e.getMessage();
saveOperationEntity(operationEntity);
}
+
/**
* OperationLog an error
*
* @param message contextual message (what operation was performed)
*/
public void logError(String message) {
- logger.error("Error {} {}", message);
+ logger.error("Error {}", message);
OperationEntity operationEntity = new OperationEntity();
operationEntity.executionTime = LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC);
operationEntity.operation = OperationEntity.Operation.ERROR;
@@ -131,16 +132,15 @@ public void logError(String message) {
saveOperationEntity(operationEntity);
}
-
private String getServerIdentification() {
return getHostName();
}
private String getHostName() {
try {
- InetAddress IP = InetAddress.getLocalHost();
+ InetAddress ipAddress = InetAddress.getLocalHost();
- return IP.getHostName();
+ return ipAddress.getHostName();
} catch (Exception e) {
return "CherryHostName";
}
@@ -150,7 +150,7 @@ private void saveOperationEntity(OperationEntity operationEntity) {
try {
operationRepository.save(operationEntity);
} catch (Exception e) {
- logger.error("Can't save OperationEntity " + operationEntity);
+ logger.error("Can't save OperationEntity [{}]", operationEntity);
}
}
}
diff --git a/src/main/java/io/camunda/cherry/runner/RunnerEmbeddedFactory.java b/src/main/java/io/camunda/cherry/runner/RunnerEmbeddedFactory.java
index 974ee36..cde9f0d 100644
--- a/src/main/java/io/camunda/cherry/runner/RunnerEmbeddedFactory.java
+++ b/src/main/java/io/camunda/cherry/runner/RunnerEmbeddedFactory.java
@@ -8,12 +8,13 @@
package io.camunda.cherry.runner;
+import io.camunda.cherry.db.entity.OperationEntity;
+import io.camunda.cherry.db.entity.RunnerDefinitionEntity;
import io.camunda.cherry.definition.AbstractConnector;
import io.camunda.cherry.definition.AbstractRunner;
import io.camunda.cherry.definition.AbstractWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@@ -24,14 +25,22 @@ public class RunnerEmbeddedFactory {
Logger logger = LoggerFactory.getLogger(RunnerEmbeddedFactory.class.getName());
- @Autowired
List listAbstractConnector;
- @Autowired
List listAbstractWorker;
- @Autowired
StorageRunner storageRunner;
+ LogOperation logOperation;
+
+ RunnerEmbeddedFactory(List listAbstractConnector,
+ List listAbstractWorker,
+ StorageRunner storageRunner,
+ LogOperation logOperation) {
+ this.listAbstractConnector = listAbstractConnector;
+ this.listAbstractWorker = listAbstractWorker;
+ this.storageRunner = storageRunner;
+ this.logOperation = logOperation;
+ }
public void registerInternalRunner() {
List listRunners = Stream.concat(listAbstractConnector.stream(), listAbstractWorker.stream())
@@ -47,14 +56,24 @@ public void registerInternalRunner() {
try {
storageRunner.saveEmbeddedRunner(runner);
} catch (Exception e) {
- logger.error("RunnerEmbeddedFactory: CAN'T SAVE [" + runner.getType() + (runner.getName() != null ?
- " (" + runner.getName() + ")" :
- "") + "] error " + e.getMessage());
- continue;
+ logOperation.log(OperationEntity.Operation.ERROR, "RunnerEmbeddedFactory: CAN'T SAVE [" + runner.getType() + (
+ runner.getName() != null ?
+ " (" + runner.getName() + ")" :
+ "") + "] error " + e.getMessage());
}
}
}
+ public List getAllRunners() {
+ return Stream.concat(listAbstractConnector.stream(), listAbstractWorker.stream()).map(t -> {
+ RunnerLightDefinition light = new RunnerLightDefinition();
+ light.name = t.getName();
+ light.type = t.getType();
+ light.origin = RunnerDefinitionEntity.Origin.EMBEDDED;
+ return light;
+ }).toList();
+ }
+
/**
* Return the runner by its name, if it exists
*
@@ -67,4 +86,17 @@ public AbstractRunner getByName(String name) {
.toList();
return listRunners.isEmpty() ? null : listRunners.get(0);
}
+
+ /**
+ * Return the runner by its name, if it exists
+ *
+ * @param type type of the runner
+ * @return null if not exist, else the runner
+ */
+ public AbstractRunner getByType(String type) {
+ List listRunners = Stream.concat(listAbstractConnector.stream(), listAbstractWorker.stream())
+ .filter(t -> t.getType().equals(type))
+ .toList();
+ return listRunners.isEmpty() ? null : listRunners.get(0);
+ }
}
diff --git a/src/main/java/io/camunda/cherry/runner/RunnerFactory.java b/src/main/java/io/camunda/cherry/runner/RunnerFactory.java
index 95ed314..aa38bd1 100644
--- a/src/main/java/io/camunda/cherry/runner/RunnerFactory.java
+++ b/src/main/java/io/camunda/cherry/runner/RunnerFactory.java
@@ -11,14 +11,18 @@
/* ******************************************************************** */
package io.camunda.cherry.runner;
+import io.camunda.cherry.db.entity.OperationEntity;
import io.camunda.cherry.db.entity.RunnerDefinitionEntity;
+import io.camunda.cherry.db.repository.RunnerExecutionRepository;
import io.camunda.cherry.definition.AbstractRunner;
import io.camunda.cherry.definition.SdkRunnerConnector;
import io.camunda.connector.api.annotation.OutboundConnector;
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
+import org.hibernate.Session;
+import org.hibernate.SessionFactory;
+import org.hibernate.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.File;
@@ -26,22 +30,37 @@
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
@Service
public class RunnerFactory {
- @Autowired
- RunnerEmbeddedFactory runnerEmbeddedFactory;
- @Autowired
- RunnerUploadFactory runnerUploadFactory;
- @Autowired
- StorageRunner storageRunner;
-
- @Autowired
- LogOperation logOperation;
+ private final RunnerEmbeddedFactory runnerEmbeddedFactory;
+ private final RunnerUploadFactory runnerUploadFactory;
+ private final StorageRunner storageRunner;
+ private final RunnerExecutionRepository runnerExecutionRepository;
+ private final LogOperation logOperation;
+ private final SessionFactory sessionFactory;
Logger logger = LoggerFactory.getLogger(RunnerFactory.class.getName());
+ RunnerFactory(RunnerEmbeddedFactory runnerEmbeddedFactory,
+ RunnerUploadFactory runnerUploadFactory,
+ StorageRunner storageRunner,
+ RunnerExecutionRepository runnerExecutionRepository,
+ LogOperation logOperation,
+ SessionFactory sessionFactory) {
+ this.runnerEmbeddedFactory = runnerEmbeddedFactory;
+ this.runnerUploadFactory = runnerUploadFactory;
+ this.storageRunner = storageRunner;
+ this.runnerExecutionRepository = runnerExecutionRepository;
+ this.logOperation = logOperation;
+ this.sessionFactory = sessionFactory;
+ }
+
public void init() {
logger.info("----- RunnerFactory.1 Load all embedded runner");
@@ -56,6 +75,40 @@ public void init() {
runnerUploadFactory.loadJavaFromStorage();
}
+ /**
+ * Must be call after the initialisation
+ * all runners are loaded amd identified. The storageRunner are checked, and all runner in the database
+ * which are not loaded are purged.
+ */
+ public void synchronize() {
+ Map mapExistingRunners = Stream.concat(
+ runnerEmbeddedFactory.getAllRunners().stream(), runnerUploadFactory.getAllRunners().stream())
+ .collect(Collectors.toMap(RunnerLightDefinition::getType, Function.identity()));
+
+ // get the list of entities
+ List listRunnersEntity = storageRunner.getRunners(new StorageRunner.Filter());
+ // identify entity which does not exist
+ List listEntityToRemove = listRunnersEntity.stream()
+ .filter(t -> !mapExistingRunners.containsKey(t.type))
+ .toList();
+
+ for (RunnerDefinitionEntity entityToRemove : listEntityToRemove) {
+ logOperation.log(OperationEntity.Operation.REMOVE,
+ "Entity type[" + entityToRemove.type + "] name[" + entityToRemove.name + "]");
+
+ try (Session session = sessionFactory.openSession()) {
+ Transaction txn = session.beginTransaction();
+ runnerExecutionRepository.deleteFromEntityType(entityToRemove.type);
+
+ storageRunner.removeEntity(entityToRemove);
+ txn.commit();
+ } catch (Exception e) {
+ logOperation.logError("Can't delete [" + entityToRemove.type + "]", e);
+ }
+ }
+
+ }
+
/**
* Get All runners
*
@@ -96,7 +149,7 @@ private AbstractRunner getRunnerFromEntity(RunnerDefinitionEntity runnerDefiniti
ClassLoader loader;
try {
// if this class is embedded?
- AbstractRunner embeddedRunner = runnerEmbeddedFactory.getByName(runnerDefinitionEntity.name);
+ AbstractRunner embeddedRunner = runnerEmbeddedFactory.getByType(runnerDefinitionEntity.type);
if (embeddedRunner != null) {
return embeddedRunner;
}
@@ -111,7 +164,7 @@ private AbstractRunner getRunnerFromEntity(RunnerDefinitionEntity runnerDefiniti
Object objectRunner = clazz.getDeclaredConstructor().newInstance();
if (AbstractRunner.class.isAssignableFrom(objectRunner.getClass())) {
- // if (objectRunner instanceof AbstractRunner runner) {
+ // if (objectRunner instanceof AbstractRunner runner) {
return (AbstractRunner) objectRunner;
} else if (objectRunner instanceof OutboundConnectorFunction outboundConnector) {
SdkRunnerConnector runner = new SdkRunnerConnector(outboundConnector);
@@ -121,9 +174,9 @@ private AbstractRunner getRunnerFromEntity(RunnerDefinitionEntity runnerDefiniti
return runner;
}
logger.error("No method to get a runner from [" + runnerDefinitionEntity.name + "]");
- logOperation.logError("Class ["+runnerDefinitionEntity.classname+"] in jar["+jarFileName+"] not a Runner or OutboundConnectorFunction");
- }
- else {
+ logOperation.logError("Class [" + runnerDefinitionEntity.classname + "] in jar[" + jarFileName
+ + "] not a Runner or OutboundConnectorFunction");
+ } else {
logOperation.logError("No Jar file, not an embedded runner for [" + runnerDefinitionEntity.name + "]");
}
return null;
diff --git a/src/main/java/io/camunda/cherry/runner/RunnerLightDefinition.java b/src/main/java/io/camunda/cherry/runner/RunnerLightDefinition.java
new file mode 100644
index 0000000..b31a53a
--- /dev/null
+++ b/src/main/java/io/camunda/cherry/runner/RunnerLightDefinition.java
@@ -0,0 +1,30 @@
+/* ******************************************************************** */
+/* */
+/* RunnerLightDefinition */
+/* */
+/* To carry information on different Runner */
+/* */
+/* ******************************************************************** */
+package io.camunda.cherry.runner;
+
+import io.camunda.cherry.db.entity.RunnerDefinitionEntity;
+
+public class RunnerLightDefinition {
+
+ public String type;
+
+ public String name;
+ public RunnerDefinitionEntity.Origin origin;
+
+ public String getType() {
+ return type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public RunnerDefinitionEntity.Origin getOrigin() {
+ return origin;
+ }
+}
diff --git a/src/main/java/io/camunda/cherry/runner/RunnerUploadFactory.java b/src/main/java/io/camunda/cherry/runner/RunnerUploadFactory.java
index 4a3ff53..e350199 100644
--- a/src/main/java/io/camunda/cherry/runner/RunnerUploadFactory.java
+++ b/src/main/java/io/camunda/cherry/runner/RunnerUploadFactory.java
@@ -2,12 +2,12 @@
import io.camunda.cherry.db.entity.JarStorageEntity;
import io.camunda.cherry.db.entity.OperationEntity;
+import io.camunda.cherry.db.entity.RunnerDefinitionEntity;
import io.camunda.cherry.definition.AbstractRunner;
import io.camunda.connector.api.annotation.OutboundConnector;
import org.hibernate.SessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;
@@ -16,7 +16,9 @@
import java.io.FileOutputStream;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
@@ -24,24 +26,31 @@
@Configuration
public class RunnerUploadFactory {
+ private final StorageRunner storageRunner;
+ private final LogOperation logOperation;
+ private final SessionFactory sessionFactory;
Logger logger = LoggerFactory.getLogger(RunnerUploadFactory.class.getName());
- @Autowired
- StorageRunner storageRunner;
-
- @Autowired
- LogOperation logOperation;
-
+ private final List listLightRunners = new ArrayList<>();
@Value("${cherry.connectorslib.uploadpath:@null}")
private String uploadPath;
-
@Value("${cherry.connectorslib.classloaderpath:@null}")
private String classLoaderPath;
-
@Value("${cherry.connectorslib.forcerefresh:false}")
private Boolean forceRefresh;
- @Autowired
- private SessionFactory sessionFactory;
+ public RunnerUploadFactory(StorageRunner storageRunner, LogOperation logOperation, SessionFactory sessionFactory) {
+ this.storageRunner = storageRunner;
+ this.logOperation = logOperation;
+ this.sessionFactory = sessionFactory;
+ }
+
+ private static RunnerLightDefinition getLightFromRunnerDefinitionEntity(RunnerDefinitionEntity entityRunner) {
+ RunnerLightDefinition runnerLightDefinition = new RunnerLightDefinition();
+ runnerLightDefinition.name = entityRunner.name;
+ runnerLightDefinition.type = entityRunner.type;
+ runnerLightDefinition.origin = RunnerDefinitionEntity.Origin.JARFILE;
+ return runnerLightDefinition;
+ }
public void loadConnectorsFromClassLoaderPath() {
// No special operation to do
@@ -102,8 +111,17 @@ public void loadStorageFromUploadPath() {
jarStorageEntity = storageRunner.getJarStorageByName(jarFile.getName());
if (jarStorageEntity != null && !Boolean.TRUE.equals(forceRefresh)) {
+ // we don't reload the JAR file, so we believe what we have in the database
+ if (jarStorageEntity != null) {
+ List runners = storageRunner.getRunners(
+ new StorageRunner.Filter().jarFileName(jarStorageEntity.name));
+ listLightRunners.addAll(
+ runners.stream().map(RunnerUploadFactory::getLightFromRunnerDefinitionEntity).toList());
+ }
+
continue;
}
+
if (jarStorageEntity == null) {
// save it
jarStorageEntity = storageRunner.saveJarRunner(jarFile);
@@ -139,6 +157,7 @@ public void loadStorageFromUploadPath() {
// this is a AbstractConnector
AbstractRunner runner = (AbstractRunner) instanceClass;
storageRunner.saveUploadRunner(runner, jarStorageEntity);
+ listLightRunners.add(getLightFromRunner(runner));
logLoadJar.append("RunnerDectection[");
logLoadJar.append(runner.getName());
@@ -146,14 +165,16 @@ public void loadStorageFromUploadPath() {
logLoadJar.append(runner.getType());
logLoadJar.append("]; ");
logOperation.log(OperationEntity.Operation.SERVERINFO,
- "Load Jar[" + jarFile.getName() + "] Runner[" + runner.getName() + "] type["
- + runner.getType() + "]");
+ "Load Jar[" + jarFile.getName() + "] Runner[" + runner.getName() + "] type[" + runner.getType()
+ + "]");
nbRunners++;
} else if (connectorAnnotation != null) {
// this is a Outbound connector
storageRunner.saveUploadRunner(connectorAnnotation.name(), connectorAnnotation.type(), clazz,
jarStorageEntity);
+ listLightRunners.add(getLightFromConnectorAnnotation(connectorAnnotation));
+
logLoadJar.append("ConnectorDetection[");
logLoadJar.append(connectorAnnotation.name());
logLoadJar.append("], type[");
@@ -193,8 +214,8 @@ public void loadStorageFromUploadPath() {
jarStorageEntity.loadLog = logLoadJar.toString();
storageRunner.updateJarStorage(jarStorageEntity);
logOperation.log(OperationEntity.Operation.SERVERINFO,
- "Load [" + jarFile.getPath() + "] connectors: " + nbConnectors + " runners: " + nbRunners + " in "
- + (endOperation - beginOperation) + " ms ");
+ "Load [" + jarFile.getPath() + "] connectors: " + nbConnectors + " runners: " + nbRunners + " in " + (
+ endOperation - beginOperation) + " ms ");
} catch (Exception e) {
logOperation.log(OperationEntity.Operation.ERROR,
@@ -203,4 +224,24 @@ public void loadStorageFromUploadPath() {
}
}
+
+ public List getAllRunners() {
+ return listLightRunners;
+ }
+
+ private RunnerLightDefinition getLightFromRunner(AbstractRunner runner) {
+ RunnerLightDefinition runnerLightDefinition = new RunnerLightDefinition();
+ runnerLightDefinition.name = runner.getName();
+ runnerLightDefinition.type = runner.getType();
+ runnerLightDefinition.origin = RunnerDefinitionEntity.Origin.JARFILE;
+ return runnerLightDefinition;
+ }
+
+ private RunnerLightDefinition getLightFromConnectorAnnotation(OutboundConnector connectorAnnotation) {
+ RunnerLightDefinition runnerLightDefinition = new RunnerLightDefinition();
+ runnerLightDefinition.name = connectorAnnotation.name();
+ runnerLightDefinition.type = connectorAnnotation.type();
+ runnerLightDefinition.origin = RunnerDefinitionEntity.Origin.JARFILE;
+ return runnerLightDefinition;
+ }
}
diff --git a/src/main/java/io/camunda/cherry/runner/StorageRunner.java b/src/main/java/io/camunda/cherry/runner/StorageRunner.java
index 38ca43f..c9b69d3 100644
--- a/src/main/java/io/camunda/cherry/runner/StorageRunner.java
+++ b/src/main/java/io/camunda/cherry/runner/StorageRunner.java
@@ -70,6 +70,7 @@ public class StorageRunner {
*/
public JarStorageEntity saveJarRunner(File jarFile) throws TechnicalException {
String connectorName = jarFile.getName();
+ logger.info("StorageRunner.saveJarRunner: file[{}] connectorName[{}]", jarFile.getPath(), connectorName);
JarStorageEntity jarStorageEntity = jarDefinitionRepository.findByName(connectorName);
if (jarStorageEntity != null)
@@ -177,12 +178,10 @@ public RunnerDefinitionEntity saveUploadRunner(String name,
* @return a RunnerDefinitionEntity, saved.
*/
public RunnerDefinitionEntity saveUploadRunner(AbstractRunner runner, JarStorageEntity jarDefinition) {
- RunnerDefinitionEntity runnerDefinition = runnerDefinitionRepository.selectByName(runner.getName());
- if (runnerDefinition != null)
- return runnerDefinition;
-
- runnerDefinition = new RunnerDefinitionEntity();
-
+ RunnerDefinitionEntity runnerDefinition = runnerDefinitionRepository.selectByType(runner.getType());
+ if (runnerDefinition == null) {
+ runnerDefinition = new RunnerDefinitionEntity();
+ }
runnerDefinition.name = runner.getName();
runnerDefinition.classname = runner.getClass().getCanonicalName();
runnerDefinition.jar = jarDefinition;
@@ -241,20 +240,18 @@ public void writeJarBlob(Session session, JarStorageEntity jarStorageEntity, Inp
* @throws IOException in case of error during the operation
*/
public RunnerDefinitionEntity saveEmbeddedRunner(AbstractRunner runner) throws IOException {
- RunnerDefinitionEntity runnerDefinition = runnerDefinitionRepository.selectByName(runner.getName());
- if (runnerDefinition != null)
- return runnerDefinition;
-
- runnerDefinition = new RunnerDefinitionEntity();
-
+ RunnerDefinitionEntity runnerDefinition = runnerDefinitionRepository.selectByType(runner.getType());
+ if (runnerDefinition == null) {
+ runnerDefinition = new RunnerDefinitionEntity();
+ // start it by default
+ runnerDefinition.activeRunner = true;
+ }
runnerDefinition.name = runner.getName();
runnerDefinition.classname = runner.getClass().getCanonicalName();
runnerDefinition.type = runner.getType();
runnerDefinition.collectionName = runner.getCollectionName();
runnerDefinition.origin = RunnerDefinitionEntity.Origin.EMBEDDED;
- // start it by default
- runnerDefinition.activeRunner = true;
return runnerDefinitionRepository.save(runnerDefinition);
}
@@ -288,13 +285,42 @@ public List getRunners(Filter filter) {
if (filter.filterType == null)
return true;
return t.type.equals(filter.filterType);
+ }).filter(t -> {
+ if (filter.jarFileName == null) {
+ return true;
+ } else {
+ return t.jar != null && filter.jarFileName.equals(t.jar.name);
+ }
}).toList();
}
- public boolean existRunner(String runnerName) {
- return runnerDefinitionRepository.selectByName(runnerName) != null;
+ /**
+ * existRunner by type
+ *
+ * @param runnerType type of runner
+ * @return true if the runner exists
+ */
+ public boolean existRunnerByType(String runnerType) {
+ return runnerDefinitionRepository.selectByType(runnerType) != null;
+ }
+
+ /**
+ * Remove an entity - does not remove the history of execution
+ *
+ * @param entity entity to remove
+ */
+
+ public void removeEntity(RunnerDefinitionEntity entity) {
+ runnerDefinitionRepository.delete(entity);
}
+ /* ******************************************************************** */
+ /* */
+ /* Remove entity */
+ /* */
+ /* Remove the entity */
+ /* ******************************************************************** */
+
public static class Filter {
/**
* Null: all runners, else True or False
@@ -308,6 +334,11 @@ public static class Filter {
*/
Boolean storeOnly;
+ /**
+ * Only jar runner inside a specific JarFile
+ */
+ String jarFileName;
+
public Filter isActive(boolean activeOnly) {
this.activeOnly = activeOnly;
return this;
@@ -327,5 +358,10 @@ public Filter type(String type) {
this.filterType = type;
return this;
}
+
+ public Filter jarFileName(String jarFileName) {
+ this.jarFileName = jarFileName;
+ return this;
+ }
}
}
diff --git a/src/main/java/io/camunda/cherry/runtime/CherryMain.java b/src/main/java/io/camunda/cherry/runtime/CherryMain.java
index 4ced233..e788fe8 100644
--- a/src/main/java/io/camunda/cherry/runtime/CherryMain.java
+++ b/src/main/java/io/camunda/cherry/runtime/CherryMain.java
@@ -25,7 +25,7 @@ public class CherryMain {
RunnerFactory runnerFactory;
@Autowired
- JobRunnerFactory cherryJobRunnerFactory;
+ JobRunnerFactory jobRunnerFactory;
@PostConstruct
public void init() {
@@ -33,16 +33,19 @@ public void init() {
logger.info("----- CherryMain.1 Load all embedded runner");
runnerFactory.init();
+ logger.info("----- CherryMain.2 purge non existing anymore runner");
+ runnerFactory.synchronize();
+
// at this point, the table is up-to-date, class loader is correct : let's start all runners
- logger.info("----- CherryMain.4 Start all runners");
- cherryJobRunnerFactory.startAll();
+ logger.info("----- CherryMain.3 Start all runners");
+ jobRunnerFactory.startAll();
}
@PreDestroy
public void end() {
logger.info("----- End is called");
- cherryJobRunnerFactory.stopAll();
+ jobRunnerFactory.stopAll();
}
diff --git a/src/main/java/io/camunda/cherry/runtime/HistoryPerformance.java b/src/main/java/io/camunda/cherry/runtime/HistoryPerformance.java
index 6acd238..5361a74 100644
--- a/src/main/java/io/camunda/cherry/runtime/HistoryPerformance.java
+++ b/src/main/java/io/camunda/cherry/runtime/HistoryPerformance.java
@@ -89,8 +89,8 @@ public Performance getPerformance(String runnerType, LocalDateTime dateNow, Peri
case FAIL -> interval.executionsFailed++;
case BPMNERROR -> interval.executionsBpmnErrors++;
}
- if (runnerExecutionEntity.executionMs > interval.picTimeInMs)
- interval.picTimeInMs = runnerExecutionEntity.executionMs;
+ if (runnerExecutionEntity.executionMs > interval.peakTimeInMs)
+ interval.peakTimeInMs = runnerExecutionEntity.executionMs;
}
// build the list and calculate average
@@ -103,8 +103,8 @@ public Performance getPerformance(String runnerType, LocalDateTime dateNow, Peri
sumTotalExecutionTimeInMs += interval.sumOfExecutionTime;
sumTotalExecutions += interval.executions;
- if (interval.picTimeInMs > performance.picTimeInMs)
- performance.picTimeInMs = interval.picTimeInMs;
+ if (interval.peakTimeInMs > performance.peakTimeInMs)
+ performance.peakTimeInMs = interval.peakTimeInMs;
}
// global values
@@ -174,7 +174,7 @@ public enum PeriodStatistic {
}
public static class Performance {
- public long picTimeInMs;
+ public long peakTimeInMs;
public long executions;
public long averageTimeInMs;
public List listIntervals = new ArrayList<>();
@@ -192,7 +192,7 @@ public static class Interval {
public long executionsSucceeded = 0;
public long executionsFailed = 0;
public long executionsBpmnErrors = 0;
- public long picTimeInMs = 0;
+ public long peakTimeInMs = 0;
public long averageTimeInMs = 0;
public Interval(String slot, LocalDateTime slotTime) {
diff --git a/src/main/java/io/camunda/cherry/store/StoreRestController.java b/src/main/java/io/camunda/cherry/store/StoreRestController.java
index 01ec435..34147f6 100644
--- a/src/main/java/io/camunda/cherry/store/StoreRestController.java
+++ b/src/main/java/io/camunda/cherry/store/StoreRestController.java
@@ -49,6 +49,7 @@ public List |