Skip to content

Commit

Permalink
Refactor GateClientAbstract and GateClientJobStream constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
namtzigla committed Oct 24, 2024
1 parent 30624d6 commit 564d84e
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public GateClientAbstract(ConfigEvent configEvent) {
start();
}


public GateClientResponseStream getResponseStream() throws UnconfiguredException {
return new GateClientResponseStream(getChannel());
}

/**
* Handles configuration changes.
* @param event The new configuration event.
Expand Down Expand Up @@ -95,7 +100,7 @@ protected ConfigEvent getConfig() throws UnconfiguredException {
* @return A ManagedChannel instance.
* @throws UnconfiguredException if the client is not configured.
*/
protected ManagedChannel getChannel() throws UnconfiguredException {
public ManagedChannel getChannel() throws UnconfiguredException {
if ((channel != null) && !channel.isShutdown() && !channel.isTerminated()) {
return channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,15 @@ public void onNext(Service.JobStreamResponse job) {
} else if (job.hasExileAgentResponse()) {
plugin.scheduleExileAgentRespose(job.getJobId(), job.getExileAgentResponse());
} else if (job.hasExileNamedJobRequest()) {
plugin.scheduleExileNamedJob(job.getJobId(), job.getExileNamedJobRequest());
if (job.getExileNamedJobRequest().hasListPools()){
plugin.listPools(job.getJobId());
} else if (job.getExileNamedJobRequest().hasGetPoolStatus()) {
plugin.getPoolStatus(job.getJobId(), job.getExileNamedJobRequest().getGetPoolStatus().getPoolId());
} else if (job.getExileNamedJobRequest().hasGetPoolRecords()) {
plugin.getPoolRecords(job.getJobId(), job.getExileNamedJobRequest().getGetPoolRecords().getPoolId());
} else {
plugin.scheduleExileNamedJob(job.getJobId(), job.getExileNamedJobRequest());
}
} else {
// TODO report back an error & reject job
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,79 +25,97 @@
import tcnapi.exile.gate.v1.Service;

@Singleton
public class GateClientResponseStream extends GateClientAbstract implements StreamObserver<Service.ResultsStreamResponse> {
private static final Logger log = LoggerFactory.getLogger(GateClientResponseStream.class);
private ManagedChannel channel;
private StreamObserver<Service.ResultsStreamRequest> streamObserver;
public class GateClientResponseStream extends GateClientAbstract
implements StreamObserver<Service.ResultsStreamResponse> {
private static final Logger log = LoggerFactory.getLogger(GateClientResponseStream.class);
private ManagedChannel channel;
private StreamObserver<Service.ResultsStreamRequest> streamObserver;

public GateClientResponseStream() {
log.debug("GateClientResponseStream created");
}

public GateClientResponseStream(ManagedChannel channel2) {
log.debug("GateClientResponseStream created with channel {}", channel2);
this.channel = channel2;
start();
}

public boolean isRunning() {
if ((channel == null) && (streamObserver == null)) {
return false;
}
log.debug("channel {} shutdown {} terminated {} -> return {}", channel, channel.isShutdown(),
channel.isTerminated(),
!channel.isShutdown() && !channel.isTerminated() && (streamObserver != null));
return (!channel.isShutdown() && !channel.isTerminated()) && (streamObserver != null);
}

@Override
public void start() {
responseStream();
}

@Override
public void onNext(Service.ResultsStreamResponse response) {
log.debug("GateClientResponseStream onNext {}", response);
}

@Override
public void onError(Throwable throwable) {
log.debug("GateClientResponseStream onError {}", throwable);
responseStream();
}

public boolean isRunning() {
if (channel == null) {
return false;
@Override
public void onCompleted() {
log.debug("GateClientResponseStream onCompleted");
responseStream();
}
log.debug("channel {} shutdown {} terminated {} -> {}", channel, channel.isShutdown(), channel.isTerminated(), !channel.isShutdown() && !channel.isTerminated());
return (!channel.isShutdown() && !channel.isTerminated());
}

@Override
public void start() {
responseStream();
}

@Override
public void onNext(Service.ResultsStreamResponse response) {
log.debug("GateClientResponseStream onNext {}", response);
}

@Override
public void onError(Throwable throwable) {
log.debug("GateClientResponseStream onError {}", throwable);
responseStream();
}

@Override
public void onCompleted() {
log.debug("GateClientResponseStream onCompleted");
responseStream();
}

public void sendError(Service.ResultsStreamRequest datasourceIsClosed) {
log.debug("GateClientResponseStream sendError {}", datasourceIsClosed);
streamObserver.onNext(datasourceIsClosed);

}

public void sendResult(Service.ResultsStreamRequest build) {
if (!isRunning()) {
log.debug("stream observer not running");

public void sendError(Service.ResultsStreamRequest datasourceIsClosed) {
log.debug("GateClientResponseStream sendError {}", datasourceIsClosed);
streamObserver.onNext(datasourceIsClosed);

}
log.debug("GateClientResponseStream sendResult {}", build);
if (streamObserver == null) {
log.debug("streamObserver not set");

public void sendResult(Service.ResultsStreamRequest build) {
if (!isRunning()) {
log.debug("stream observer not running");
}
log.debug("GateClientResponseStream sendResult {}", build);
if (streamObserver == null) {
log.debug("streamObserver not set");
}
streamObserver.onNext(build);
log.debug("After GateClientResponseStream sendResult {}", build);
}
streamObserver.onNext(build);
log.debug("After GateClientResponseStream sendResult {}", build);
}

public synchronized void responseStream() {
if (!isRunning()) {
log.debug("setting up responseStream");
try {
channel = getChannel();
} catch (UnconfiguredException e) {
throw new RuntimeException(e);
}
var client = ExileGateServiceGrpc.newStub(channel).withWaitForReady();
this.streamObserver = client.resultsStream(this);

public synchronized void responseStream() {
if (!isRunning()) {
log.debug("setting up responseStream");
if (channel == null) {
try {
channel = getChannel();
} catch (UnconfiguredException e) {
throw new RuntimeException(e);
}
}
var client = ExileGateServiceGrpc.newStub(channel).withWaitForReady();
log.debug("setting stream observer ");
this.streamObserver = client.resultsStream(this);
log.debug("stream observer set");
} else {
log.debug("response stream, already running");
}
}
}

public void sendEOF(String jobId) {
log.debug("Sending EOF of {}", jobId);
streamObserver.onNext(Service.ResultsStreamRequest.newBuilder()
.setPayload("EOF")
.setJobId(jobId)
.build());
public void sendEOF(String jobId) {
log.debug("Sending EOF of {}", jobId);
streamObserver.onNext(Service.ResultsStreamRequest.newBuilder()
.setPayload("EOF")
.setJobId(jobId)
.build());

}
}
}

0 comments on commit 564d84e

Please sign in to comment.