diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index 5a3fa0a79..d5f0b3910 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -25,6 +25,7 @@ import io.dapr.client.domain.HttpExtension; import io.dapr.client.domain.InvokeBindingRequest; import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.Job; import io.dapr.client.domain.PublishEventRequest; import io.dapr.client.domain.SaveStateRequest; import io.dapr.client.domain.State; @@ -44,6 +45,8 @@ import java.util.Map; import java.util.function.Function; +import com.google.protobuf.Message; + /** * Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required. * @@ -662,6 +665,7 @@ Flux subscribeConfiguration(String storeName, Li * @return Mono of {@link UnsubscribeConfigurationResponse} instance. */ Mono unsubscribeConfiguration(UnsubscribeConfigurationRequest request); + /** * Returns a newly created gRPC stub with proper interceptors and channel for gRPC proxy invocation. diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 803488c0a..26a672099 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -13,9 +13,12 @@ package io.dapr.client; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import com.google.protobuf.Message; import io.dapr.client.domain.ActorMetadata; import io.dapr.client.domain.AppConnectionPropertiesHealthMetadata; import io.dapr.client.domain.AppConnectionPropertiesMetadata; @@ -38,6 +41,7 @@ import io.dapr.client.domain.HttpExtension; import io.dapr.client.domain.InvokeBindingRequest; import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.Job; import io.dapr.client.domain.LockRequest; import io.dapr.client.domain.PublishEventRequest; import io.dapr.client.domain.QueryStateItem; @@ -56,6 +60,7 @@ import io.dapr.client.domain.UnsubscribeConfigurationRequest; import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.client.resiliency.ResiliencyOptions; +import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.internal.exceptions.DaprHttpException; import io.dapr.internal.grpc.DaprClientGrpcInterceptors; @@ -89,6 +94,7 @@ import reactor.util.retry.Retry; import java.io.IOException; +import java.nio.charset.Charset; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -137,6 +143,10 @@ public class DaprClientImpl extends AbstractDaprClient { private final DaprHttp httpClient; private final DaprClientGrpcInterceptors grpcInterceptors; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final Charset CHARSET = Properties.STRING_CHARSET.get(); /** * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder @@ -1295,6 +1305,96 @@ public Mono unsubscribeConfiguration(Unsubscri } } + @Override + public Mono scheduleJobAlpha1(Job job) { + try { + final String name = job.getName(); + final T data = job.getData(); + + if (name == null || name.trim().isEmpty()) { + throw new IllegalArgumentException("Job name cannot be null or empty"); + } + if (data == null) { + throw new IllegalArgumentException("Job data cannot be empty"); + } + + DaprProtos.Job.Builder jobBuilder = DaprProtos.Job.newBuilder() + .setName(name); + if (data instanceof String) { + jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom((String) data, CHARSET))); + } else if (data instanceof byte[]) { + String base64 = OBJECT_MAPPER.writeValueAsString(data); + jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom(base64, CHARSET))); + } else { + return Mono.error(() -> { + throw new IllegalArgumentException("Job data value must be String or byte[]"); + }); + } + if (job.getSchedule() != null && !job.getSchedule().trim().isEmpty()) { + jobBuilder.setSchedule(job.getSchedule()); + } + if (job.getRepeats() != null) { + jobBuilder.setRepeats(job.getRepeats()); + } + if (job.getDueTime() != null && !job.getDueTime().trim().isEmpty()) { + jobBuilder.setDueTime(job.getDueTime()); + } + if (job.getTtl() != null && !job.getTtl().trim().isEmpty()) { + jobBuilder.setTtl(job.getTtl()); + } + + DaprProtos.ScheduleJobRequest.Builder builder = + DaprProtos.ScheduleJobRequest.newBuilder() + .setJob(jobBuilder.build()); + + return this.createMono( + it -> intercept(null, asyncStub).scheduleJobAlpha1(builder.build(), it)) + .then(); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } + + @SuppressWarnings("unchecked") + @Override + public Mono> getJobAlpha1(String name, Class clazz) { + try { + if (name == null || name.trim().isEmpty()) { + throw new IllegalArgumentException("Job name cannot be null or empty"); + } + return this.createMono( + it -> intercept(null, asyncStub).getJobAlpha1(DaprProtos.GetJobRequest.newBuilder().setName(name).build(), it)) + .map(it -> { + DaprProtos.Job _job = it.getJob(); + T data = null; + if (clazz.isInstance(String.class)) { + data = (T)_job.getData().toByteString().toString(CHARSET); + } else if (clazz.isInstance(byte[].class)) { + data = (T) _job.getData().toByteArray(); + } else { + throw new IllegalArgumentException("Job data type must be String or byte[]"); + } + return new Job<>(_job.getName(), _job.getSchedule(), _job.getRepeats(), _job.getDueTime(), _job.getTtl(), data); + }); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } + + @Override + public Mono deleteJobAlpha1(String name) { + try { + if (name == null || name.trim().isEmpty()) { + throw new IllegalArgumentException("Job name cannot be null or empty"); + } + return this.createMono( + it -> intercept(null, asyncStub).deleteJobAlpha1(DaprProtos.DeleteJobRequest.newBuilder().setName(name).build(), it)) + .then(); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } + /** * Build a new Configuration Item from provided parameter. * diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 95911efc2..8b0e2943e 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -17,6 +17,7 @@ import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishResponse; import io.dapr.client.domain.BulkPublishResponseFailedEntry; +import io.dapr.client.domain.Job; import io.dapr.client.domain.LockRequest; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; @@ -268,4 +269,30 @@ Mono> publishEvents(String pubsubName, String topicNa */ Subscription subscribeToEvents( String pubsubName, String topic, SubscriptionListener listener, TypeRef type); + + /** + * ScheduleJobAlpha1 creates and schedules a job. + * + * @param The type of the data for the job. + * @param job job to be scheduled + * @return a Mono plan of type Void. + */ + Mono scheduleJobAlpha1(Job job); + + /** + * GetJobAlpha1 retrieve Job by name. + * + * @param The type of the data for the job. + * @param name name of the job + * @return a Mono of Job + */ + Mono> getJobAlpha1(String name, Class clazz); + + /** + * Delete a Job. + * + * @param name name of the job + * @return + */ + Mono deleteJobAlpha1(String name); } diff --git a/sdk/src/main/java/io/dapr/client/domain/Job.java b/sdk/src/main/java/io/dapr/client/domain/Job.java new file mode 100644 index 000000000..c30743cb1 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/Job.java @@ -0,0 +1,94 @@ +package io.dapr.client.domain; + +import com.google.protobuf.Message; + +/** + * A Job to schedule + * + * @param The class type of Job data. + */ +public final class Job { + + private final String name; + + private String schedule; + + private Integer repeats; + + private String dueTime; + + private String ttl; + + private final T data; + + /** + * Constructor for Job + * + * @param name name of the job to create + */ + public Job(String name, T data) { + super(); + this.name = name; + this.data = data; + } + + /** + * Constructor for Job + * + * @param name name of the job to create + * @param schedule schedule for the job + * @param repeats jobs with fixed repeat counts (accounting for Actor Reminders). + * @param dueTime sets time at which or time interval before the callback is invoked for the first time. + * @param ttl Time To Live to allow for auto deletes (accounting for Actor Reminders). + * @param data Job data + */ + public Job(String name, String schedule, Integer repeats, String dueTime, String ttl, T data) { + super(); + this.name = name; + this.schedule = schedule; + this.repeats = repeats; + this.dueTime = dueTime; + this.ttl = ttl; + this.data = data; + } + + public String getSchedule() { + return schedule; + } + + public void setSchedule(String schedule) { + this.schedule = schedule; + } + + public Integer getRepeats() { + return repeats; + } + + public void setRepeats(Integer repeats) { + this.repeats = repeats; + } + + public String getDueTime() { + return dueTime; + } + + public void setDueTime(String dueTime) { + this.dueTime = dueTime; + } + + public String getTtl() { + return ttl; + } + + public void setTtl(String ttl) { + this.ttl = ttl; + } + + public T getData() { + return data; + } + + public String getName() { + return name; + } +}