Skip to content

Commit

Permalink
fix the job stream reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
namtzigla committed Dec 13, 2024
1 parent c5354a5 commit 186d824
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
/*
* Copyright 2017-2024 original authors
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* https://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -58,7 +58,7 @@ public void start() {

public boolean isRunning() {
if (channel == null) {
log.debug("channel is null, JobStream is not running");
log.debug("channel is null, JobStream is not running");
return false;
}
log.debug("channel {} shutdown {} terminated {} -> {}", channel, channel.isShutdown(), channel.isTerminated(), !channel.isShutdown() && !channel.isTerminated());
Expand All @@ -67,7 +67,7 @@ public boolean isRunning() {

public synchronized void eventStream() {
if (!isRunning()) {
log.debug("starting JobStream");
log.debug("starting JobStream");
try {
channel = getChannel();
var client = ExileGateServiceGrpc.newStub(channel).withWaitForReady();
Expand All @@ -93,14 +93,14 @@ public void onNext(Service.JobStreamResponse job) {
} else if (job.hasExileAgentResponse()) {
plugin.scheduleExileAgentRespose(job.getJobId(), job.getExileAgentResponse());
} else if (job.hasExileNamedJobRequest()) {
if (job.getExileNamedJobRequest().hasListPools()){
plugin.listPools(job.getJobId());
if (job.getExileNamedJobRequest().hasListPools()) {
plugin.listPools(job.getJobId());
} else if (job.getExileNamedJobRequest().hasGetPoolStatus()) {
plugin.getPoolStatus(job.getJobId(), job.getExileNamedJobRequest().getGetPoolStatus().getPoolId());
plugin.getPoolStatus(job.getJobId(), job.getExileNamedJobRequest().getGetPoolStatus().getPoolId());
} else if (job.getExileNamedJobRequest().hasGetPoolRecords()) {
plugin.getPoolRecords(job.getJobId(), job.getExileNamedJobRequest().getGetPoolRecords().getPoolId());
plugin.getPoolRecords(job.getJobId(), job.getExileNamedJobRequest().getGetPoolRecords().getPoolId());
} else {
plugin.scheduleExileNamedJob(job.getJobId(), job.getExileNamedJobRequest());
plugin.scheduleExileNamedJob(job.getJobId(), job.getExileNamedJobRequest());
}
} else {
// TODO report back an error & reject job
Expand All @@ -117,12 +117,18 @@ public void onNext(Service.JobStreamResponse job) {
@Override
public void onError(Throwable throwable) {
log.debug("GateClientJobStream error {}", throwable.getMessage());
eventStream();
channel.shutdownNow();
// if (isRunning()) {
// log.debug("shutting down channel");
// channel.shutdownNow();
// }
// eventStream();
}

@Override
public void onCompleted() {
log.debug("GateClientJobStream completed");
eventStream();
channel.shutdownNow();
// eventStream();
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
/*
* Copyright 2017-2024 original authors
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* https://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,16 +18,30 @@

import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class GateClientJobStreamStarter implements ApplicationEventListener<StartupEvent> {
private static final Logger log = LoggerFactory.getLogger(GateClientJobStreamStarter.class);
@Inject
private GateClientJobStream streamClient;

@Override
public void onApplicationEvent(StartupEvent event) {
log.debug("GateClientJobStreamStarter started");
}

@Scheduled(fixedRate = "10s")
public void start() {
if (streamClient.isRunning()) {
log.debug("GateClientJobStreamStarter already running");
} else {
log.debug("GateClientJobStreamStarter starting");
streamClient.eventStream();
}
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ protobufVersion=3.25.5

org.gradle.jvmargs=-Xmx4G

version=0.7.0-SNAPSHOT
version=0.8.0-SNAPSHOT

0 comments on commit 186d824

Please sign in to comment.