-
Notifications
You must be signed in to change notification settings - Fork 583
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CTR : not working in SNAPSHOT-3.0.0 #6071
Comments
Hello, If you were to create the following CTR definition, |
I will try to do what you asked asap. But in the mean time, I can confirm: we deployed a CTR Boot 3 version with a SCDF 2.11.5. Because we are so desperate to get our Boot 3 tasks/jobs running in SCDF. We really need them to be Boot 3 (and not Boot 2 anymore) because we want to use Boot 3's support for improved metrics with Micrometer and new distributed tracing support with Micrometer Tracing. So we configured the SCDF Helm chart as follows:
Using the default Boot 2 CTR in combination with SB3 jobs simply didn't work, as that CTR will write in different tables. Juggling with table prefixes didn't lead anywhere. But from what I understood from your answer, running a Boot 3 CTR on SCDF 2.11.5 won't be possible anyway, right? |
CTR 2.11.5 can launch Boot 3 applications. So don't use CTR 3.0.0 with SCDF 2.11.5. |
We didn't get CTR 2.11.5 working with Boot 3 BATCH applications. The CTR itself started reading/writing in BOOT3-prefixed tables, while it is a Boot 2 application. We received SQL exceptions about jobs not existing etc. We will check this again and give more details. |
Hey @cbonami, in your task application are you defining a custom If so try to autowire JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
// ....
Optional.ofNullable(batchProperties)
.map(BatchProperties::getJdbc)
.map(BatchProperties.Jdbc::getTablePrefix)
.ifPresent(factory::setTablePrefix);
factory.afterPropertiesSet();
return Objects.requireNonNull(factory.getObject()); JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
// ....
Optional.ofNullable(batchProperties)
.map(BatchProperties::getJdbc)
.map(BatchProperties.Jdbc::getTablePrefix)
.ifPresent(jobExplorerFactoryBean::setTablePrefix);
jobExplorerFactoryBean.afterPropertiesSet();
return Objects.requireNonNull(jobExplorerFactoryBean.getObject()); |
@klopfdreh , thanks for your reply. But no, we did not provide a custom JobExplorer or JobRepository. Anyway, in the meantime we could get it running via a hack. At least, we consider this a hack. We reverted everything to SCDF 2.11.5 and the standard Boot2-based CTR. We deployed our Composed Task comprising 2 subtasks: a Boot3 task, and a Boot3 batch app.
This is what put us on the wrong foot during our first experiments when we tried the CTR (Boot2) with the 2 subtasks that where Boot2 before, but that we converted to Boot3. At that time we received exactly the same error, something that we never encountered in the Boot2-world. This made us conclude that the CTR also needed to be Boot3, which apparently is not the case. So now, we circumvented this problem by adding an argument (called 'unique') when starting the CT. The argument contains a random UUID. Now we can run the CT multiple times without any problem; this is what we see in the logs:
As you can see, |
To clarify. Its CTR that is throwing the |
Indeed. |
Can you share with us the deployer and app properties (sensitive information redacted) you are using? Currently we are unable to reproduce. |
Sorry for the late reply. Here you go:
Maybe not noteworthy, but we configure the apps and launch the CTR using SCDF-server's API. |
Hello @cbonami , CTR is a Boot 2.x application that sends requests to SCDF to launch both Boot2 and Boot3 apps. |
Agreed. But here's the thing: we don't add these arguments. They are added automatically somehow. Now, isn't this maybe related to naming? :
In 'Applications': In 'Tasks': Here's the code that we use to start the CT 'migrate-employer': ...
import org.springframework.cloud.dataflow.rest.client.DataFlowOperations;
import org.springframework.cloud.dataflow.rest.client.dsl.DeploymentPropertiesBuilder;
import org.springframework.cloud.dataflow.rest.client.dsl.task.Task;
import org.springframework.cloud.dataflow.rest.resource.LaunchResponseResource;
...
@Override
public String startMigrateEmployer(String initiator, List<MigrateEmployerData> employers, @NotNull List<TaskPhase> includePhases, @NotNull List<TaskPhase> excludePhases) {
final ComposedTask task = ComposedTask.MIGRATE_EMPLOYER;
final Task scdfTask = Task.builder(dataFlowOperations).findByName(task.getName()).orElseThrow(() -> new IllegalArgumentException("Task [%s] not installed in SCDF Server".formatted(task.getName())));
final DeploymentPropertiesBuilder deploymentPropertiesBuilder = new DeploymentPropertiesBuilder();
final ScdfArgumentBuilder argumentBuilder = new ScdfArgumentBuilder();
addGlobalProperties(deploymentPropertiesBuilder, task);
addInitiator(deploymentPropertiesBuilder, initiator);
addMigrateEmployerData(deploymentPropertiesBuilder, employers);
DeterminePhasesForComposedTask.execute(task, includePhases, excludePhases)
.forEach((key, value) -> {
addIncludedPhases(argumentBuilder, key, value.getFirst());
addExcludedPhases(argumentBuilder, key, value.getSecond());
});
addCustomAppProperties(deploymentPropertiesBuilder, task);
addK8sLabels(deploymentPropertiesBuilder, task);
addK8sAnnotations(deploymentPropertiesBuilder, task);
Map<String, String> deploymentProperties = deploymentPropertiesBuilder.build();
List<String> arguments = argumentBuilder.build();
// If you omit or leave properties empty in subsequent runs, scdf will reuse the property and its filled in
// value from the previous run. For include/exclude phases we don't want that behavior because its value
// could be empty. To avoid the -transfer properties from last run behavior-, we use arguments.
// Arguments always start fresh and scdf won't look at arguments from previous runs.
final LaunchResponseResource launchResponse = scdfTask.launch(deploymentProperties, arguments);
final long executionId = launchResponse.getExecutionId();
return ComposedTaskId.from(task, executionId).getId();
}
private void addGlobalProperties(final DeploymentPropertiesBuilder builder, final ComposedTask task) {
final Map<String, String> environmentVariables = new HashMap<>();
environmentVariables.put(COMPOSED_TASK_NAME_ENVIRONMENT_VARIABLE, task.getName());
if (scdfProperties.monitor().enabled()) {
environmentVariables.put("MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_ENABLED", "true");
final ScdfProperties.monitor monitor = scdfProperties.monitor();
if (StringUtils.isNotBlank(monitor.prometheusRSocketHost())) {
environmentVariables.put("MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_HOST", monitor.prometheusRSocketHost());
}
if (Objects.nonNull(monitor.prometheusRSocketPort())) {
environmentVariables.put("MANAGEMENT_METRICS_EXPORT_PROMETHEUS_RSOCKET_PORT", monitor.prometheusRSocketPort().toString());
}
}
builder.put(
"deployer.*.kubernetes.environment-variables",
environmentVariables.entrySet().stream()
.map(entry -> entry.getKey() + "=" + entry.getValue())
.collect(Collectors.joining(","))
);
}
private void addInitiator(final DeploymentPropertiesBuilder builder, String initiator) {
builder.put("app.%s.%s.initiator".formatted(INIT.getName(), INIT.getName()), initiator);
}
private void addMigrateEmployerData(final DeploymentPropertiesBuilder builder, final List<MigrateEmployerData> employers) {
builder.put(
"app.%s.%s.migrate-employer-data".
formatted(
INIT.getName(),
INIT.getName()
),
employers.stream()
.map(jsonSerializer::serialize)
.collect(Collectors.joining(";"))
);
}
private void addCustomAppProperties(DeploymentPropertiesBuilder builder, ComposedTask composedTask) {
composedTask.getSubTasks().forEach(task -> {
final Map<String, String> taskProperties = scdfProperties.tasks().getOrDefault(task.getName(), Map.of());
final String secrets = taskProperties
.entrySet()
.stream()
.filter(e -> Objects.nonNull(e.getValue()) && e.getValue().contains("secretName"))
.map(e -> {
final ScdfSecret scdfSecret = jsonSerializer.parse(e.getValue(), ScdfSecret.class);
return jsonSerializer.serialize(
new ScdfSecret(
"%s.%s".formatted(task.name(), e.getKey().toUpperCase()),
scdfSecret.secretName(),
scdfSecret.dataKey()
)
);
})
.collect(Collectors.joining(","));
builder.put("deployer.%s.kubernetes.secretKeyRefs".formatted(task.getName()), "[%s]".formatted(secrets));
taskProperties
.entrySet()
.stream()
.filter(entry -> Objects.nonNull(entry.getValue()) && !entry.getValue().contains("secretName"))
.forEach(entry -> builder.put("app.%s.%s.%s".formatted(task.getName(), task.getName(), entry.getKey()), entry.getValue()));
});
}
and public class ScdfArgumentBuilder {
private final Map<Task, List<Pair<String, String>>> argumentsByTask = new EnumMap<>(Task.class);
ScdfArgumentBuilder() {
}
ScdfArgumentBuilder add(final Task task, final String key, final String value) {
final List<Pair<String, String>> arguments = argumentsByTask.computeIfAbsent(task, t -> new ArrayList<>());
arguments.add(Pair.of(key, value));
return this;
}
List<String> build() {
List<String> list = argumentsByTask.entrySet().stream()
.map(taskMapEntry -> {
final Task task = taskMapEntry.getKey();
return IntStream.range(0, taskMapEntry.getValue().size())
.mapToObj(i -> {
final Pair<String, String> argument = taskMapEntry.getValue().get(i);
return "app.%s.%s=--%s=%s".formatted(task.getName(), i, argument.getKey(), argument.getValue());
})
.toList();
})
.flatMap(List::stream)
.toList();
List<String> list2 = new ArrayList<>(list);
// hack: since Boot 3 subtasks are used, we need to add a unique identifier to the arguments
list2.add("unique=" + UUID.randomUUID());
return list2;
}
} |
Are you deleting the previous task-executions of your CTR before launching the new one? |
Description:
Our journey began when trying to have a Composed Task with a Boot 3 based applications. The CTR of the latest released version of SCDF is still based on Boot 2. So the CTR itself started reading/writing in BOOT3-prefixed tables, while it is a Boot 2 application. That's why this didn't work. We also assumed that it is even not supposed to work due to the combination of Boot2 and Boot3. A bit odd, as Boot 3 is available already for such a long time.
So we hoped that the Boot 3 version of the CTR, available in SCDF github branch 'main3' would solve our issue. We also assumed that this branch corresponds to the image on dockerhub
springcloud/spring-cloud-dataflow-composed-task-runner:3.0.0-SNAPSHOT
.We ran our composed task with SCDF version:
bitnami/spring-cloud-dataflow:2.11.5-debian-12-r3
.But then, even before sub-tasks are launched, we encounter this exception in the container running the composed task:
No clue why this happens.
Is this (still) as expected? Do we have to wait for the final 3.0.0 version of the CTR?
Release versions:
bitnami/spring-cloud-dataflow:2.11.5-debian-12-r3
springcloud/spring-cloud-dataflow-composed-task-runner:3.0.0-SNAPSHOT
The text was updated successfully, but these errors were encountered: