diff --git a/core/src/main/java/com/tcn/exile/gateclients/GateClientJobStream.java b/core/src/main/java/com/tcn/exile/gateclients/GateClientJobStream.java index 6ff1171..946e513 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/GateClientJobStream.java +++ b/core/src/main/java/com/tcn/exile/gateclients/GateClientJobStream.java @@ -1,12 +1,12 @@ -/* +/* * Copyright 2017-2024 original authors - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * https://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -58,7 +58,7 @@ public void start() { public boolean isRunning() { if (channel == null) { - log.debug("channel is null, JobStream is not running"); + log.debug("channel is null, JobStream is not running"); return false; } log.debug("channel {} shutdown {} terminated {} -> {}", channel, channel.isShutdown(), channel.isTerminated(), !channel.isShutdown() && !channel.isTerminated()); @@ -67,7 +67,7 @@ public boolean isRunning() { public synchronized void eventStream() { if (!isRunning()) { - log.debug("starting JobStream"); + log.debug("starting JobStream"); try { channel = getChannel(); var client = ExileGateServiceGrpc.newStub(channel).withWaitForReady(); @@ -93,14 +93,14 @@ public void onNext(Service.JobStreamResponse job) { } else if (job.hasExileAgentResponse()) { plugin.scheduleExileAgentRespose(job.getJobId(), job.getExileAgentResponse()); } else if (job.hasExileNamedJobRequest()) { - if (job.getExileNamedJobRequest().hasListPools()){ - plugin.listPools(job.getJobId()); + if (job.getExileNamedJobRequest().hasListPools()) { + plugin.listPools(job.getJobId()); } else if (job.getExileNamedJobRequest().hasGetPoolStatus()) { - plugin.getPoolStatus(job.getJobId(), job.getExileNamedJobRequest().getGetPoolStatus().getPoolId()); + plugin.getPoolStatus(job.getJobId(), job.getExileNamedJobRequest().getGetPoolStatus().getPoolId()); } else if (job.getExileNamedJobRequest().hasGetPoolRecords()) { - plugin.getPoolRecords(job.getJobId(), job.getExileNamedJobRequest().getGetPoolRecords().getPoolId()); + plugin.getPoolRecords(job.getJobId(), job.getExileNamedJobRequest().getGetPoolRecords().getPoolId()); } else { - plugin.scheduleExileNamedJob(job.getJobId(), job.getExileNamedJobRequest()); + plugin.scheduleExileNamedJob(job.getJobId(), job.getExileNamedJobRequest()); } } else { // TODO report back an error & reject job @@ -117,12 +117,18 @@ public void onNext(Service.JobStreamResponse job) { @Override public void onError(Throwable throwable) { log.debug("GateClientJobStream error {}", throwable.getMessage()); - eventStream(); + channel.shutdownNow(); +// if (isRunning()) { +// log.debug("shutting down channel"); +// channel.shutdownNow(); +// } +// eventStream(); } @Override public void onCompleted() { log.debug("GateClientJobStream completed"); - eventStream(); + channel.shutdownNow(); +// eventStream(); } } diff --git a/core/src/main/java/com/tcn/exile/gateclients/GateClientJobStreamStarter.java b/core/src/main/java/com/tcn/exile/gateclients/GateClientJobStreamStarter.java index fbf23bf..7538988 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/GateClientJobStreamStarter.java +++ b/core/src/main/java/com/tcn/exile/gateclients/GateClientJobStreamStarter.java @@ -1,12 +1,12 @@ -/* +/* * Copyright 2017-2024 original authors - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * https://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,16 +18,30 @@ import io.micronaut.context.event.ApplicationEventListener; import io.micronaut.context.event.StartupEvent; +import io.micronaut.scheduling.annotation.Scheduled; import jakarta.inject.Inject; import jakarta.inject.Singleton; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Singleton public class GateClientJobStreamStarter implements ApplicationEventListener { + private static final Logger log = LoggerFactory.getLogger(GateClientJobStreamStarter.class); @Inject private GateClientJobStream streamClient; @Override public void onApplicationEvent(StartupEvent event) { + log.debug("GateClientJobStreamStarter started"); + } + @Scheduled(fixedRate = "10s") + public void start() { + if (streamClient.isRunning()) { + log.debug("GateClientJobStreamStarter already running"); + } else { + log.debug("GateClientJobStreamStarter starting"); + streamClient.eventStream(); + } } } diff --git a/gradle.properties b/gradle.properties index 1724a83..59b6f91 100644 --- a/gradle.properties +++ b/gradle.properties @@ -6,4 +6,4 @@ protobufVersion=3.25.5 org.gradle.jvmargs=-Xmx4G -version=0.7.0-SNAPSHOT +version=0.8.0-SNAPSHOT