From bc711d533c1035a0dc4fc606f57ea9e1a1a630b1 Mon Sep 17 00:00:00 2001 From: Vacha Shah Date: Fri, 21 Apr 2023 23:58:53 +0000 Subject: [PATCH] Adding Task related classes with protobuf integration Signed-off-by: Vacha Shah --- .../tasks/ProtobufCancellableTask.java | 91 ++++ .../org/opensearch/tasks/ProtobufTask.java | 446 ++++++++++++++++++ .../tasks/ProtobufTaskAwareRequest.java | 50 ++ .../org/opensearch/tasks/ProtobufTaskId.java | 92 ++++ .../opensearch/tasks/ProtobufTaskInfo.java | 231 +++++++++ .../tasks/ProtobufTaskResourceStats.java | 48 ++ .../opensearch/tasks/ProtobufTaskResult.java | 226 +++++++++ 7 files changed, 1184 insertions(+) create mode 100644 server/src/main/java/org/opensearch/tasks/ProtobufCancellableTask.java create mode 100644 server/src/main/java/org/opensearch/tasks/ProtobufTask.java create mode 100644 server/src/main/java/org/opensearch/tasks/ProtobufTaskAwareRequest.java create mode 100644 server/src/main/java/org/opensearch/tasks/ProtobufTaskId.java create mode 100644 server/src/main/java/org/opensearch/tasks/ProtobufTaskInfo.java create mode 100644 server/src/main/java/org/opensearch/tasks/ProtobufTaskResourceStats.java create mode 100644 server/src/main/java/org/opensearch/tasks/ProtobufTaskResult.java diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufCancellableTask.java b/server/src/main/java/org/opensearch/tasks/ProtobufCancellableTask.java new file mode 100644 index 0000000000000..9e47da0265e86 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufCancellableTask.java @@ -0,0 +1,91 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import org.opensearch.common.Nullable; +import org.opensearch.common.unit.TimeValue; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.search.SearchService.NO_TIMEOUT; + +/** + * A task that can be canceled +* +* @opensearch.internal +*/ +public abstract class ProtobufCancellableTask extends ProtobufTask { + + private volatile String reason; + private final AtomicBoolean cancelled = new AtomicBoolean(false); + private final TimeValue cancelAfterTimeInterval; + + public ProtobufCancellableTask(long id, String type, String action, String description, ProtobufTaskId parentTaskId, Map headers) { + this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT); + } + + public ProtobufCancellableTask( + long id, + String type, + String action, + String description, + ProtobufTaskId parentTaskId, + Map headers, + TimeValue cancelAfterTimeInterval + ) { + super(id, type, action, description, parentTaskId, headers); + this.cancelAfterTimeInterval = cancelAfterTimeInterval; + } + + /** + * This method is called by the task manager when this task is cancelled. + */ + public void cancel(String reason) { + assert reason != null; + if (cancelled.compareAndSet(false, true)) { + this.reason = reason; + onCancelled(); + } + } + + /** + * Returns true if this task should be automatically cancelled if the coordinating node that + * requested this task left the cluster. + */ + public boolean cancelOnParentLeaving() { + return true; + } + + /** + * Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled. + */ + public abstract boolean shouldCancelChildrenOnCancellation(); + + public boolean isCancelled() { + return cancelled.get(); + } + + public TimeValue getCancellationTimeout() { + return cancelAfterTimeInterval; + } + + /** + * The reason the task was cancelled or null if it hasn't been cancelled. + */ + @Nullable + public final String getReasonCancelled() { + return reason; + } + + /** + * Called after the task is cancelled so that it can take any actions that it has to take. + */ + protected void onCancelled() {} +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTask.java b/server/src/main/java/org/opensearch/tasks/ProtobufTask.java new file mode 100644 index 0000000000000..dd3a446a0d120 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTask.java @@ -0,0 +1,446 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionResponse; +import org.opensearch.action.NotifyOnceListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.NamedWriteable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Current task information +* +* @opensearch.internal +*/ +public class ProtobufTask { + + private static final Logger logger = LogManager.getLogger(ProtobufTask.class); + + /** + * The request header to mark tasks with specific ids + */ + public static final String X_OPAQUE_ID = "X-Opaque-Id"; + + private static final String TOTAL = "total"; + + private final long id; + + private final String type; + + private final String action; + + private final String description; + + private final ProtobufTaskId parentTask; + + private final Map headers; + + private final Map> resourceStats; + + private final List> resourceTrackingCompletionListeners; + + /** + * Keeps track of the number of active resource tracking threads for this task. It is initialized to 1 to track + * the task's own/self thread. When this value becomes 0, all threads have been marked inactive and the resource + * tracking can be stopped for this task. + */ + private final AtomicInteger numActiveResourceTrackingThreads = new AtomicInteger(1); + + /** + * The task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style). + */ + private final long startTime; + + /** + * The task's start time as a relative time ({@link System#nanoTime()} style). + */ + private final long startTimeNanos; + + public ProtobufTask(long id, String type, String action, String description, ProtobufTaskId parentTask, Map headers) { + this( + id, + type, + action, + description, + parentTask, + System.currentTimeMillis(), + System.nanoTime(), + headers, + new ConcurrentHashMap<>(), + new ArrayList<>() + ); + } + + public ProtobufTask( + long id, + String type, + String action, + String description, + ProtobufTaskId parentTask, + long startTime, + long startTimeNanos, + Map headers, + ConcurrentHashMap> resourceStats, + List> resourceTrackingCompletionListeners + ) { + this.id = id; + this.type = type; + this.action = action; + this.description = description; + this.parentTask = parentTask; + this.startTime = startTime; + this.startTimeNanos = startTimeNanos; + this.headers = headers; + this.resourceStats = resourceStats; + this.resourceTrackingCompletionListeners = resourceTrackingCompletionListeners; + } + + /** + * Build a version of the task status you can throw over the wire and back + * to the user. + * + * @param localNodeId + * the id of the node this task is running on + * @param detailed + * should the information include detailed, potentially slow to + * generate data? + */ + public final ProtobufTaskInfo taskInfo(String localNodeId, boolean detailed) { + return taskInfo(localNodeId, detailed, detailed == false); + } + + /** + * Build a version of the task status you can throw over the wire and back + * with the option to include resource stats or not. + * This method is only used during creating TaskResult to avoid storing resource information into the task index. + * + * @param excludeStats should information exclude resource stats. + * By default, detailed flag is used to control including resource information. + * But inorder to avoid storing resource stats into task index as strict mapping is enforced and breaks when adding this field. + * In the future, task-index-mapping.json can be modified to add resource stats. + */ + private ProtobufTaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeStats) { + String description = null; + ProtobufTask.Status status = null; + ProtobufTaskResourceStats resourceStats = null; + if (detailed) { + description = getDescription(); + status = getStatus(); + } + if (excludeStats == false) { + resourceStats = new ProtobufTaskResourceStats(new HashMap<>() { + { + put(TOTAL, getTotalResourceStats()); + } + }); + } + return taskInfo(localNodeId, description, status, resourceStats); + } + + /** + * Build a {@link ProtobufTaskInfo} for this task without resource stats. + */ + protected final ProtobufTaskInfo taskInfo(String localNodeId, String description, Status status) { + return taskInfo(localNodeId, description, status, null); + } + + /** + * Build a proper {@link ProtobufTaskInfo} for this task. + */ + protected final ProtobufTaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) { + return new ProtobufTaskInfo( + new ProtobufTaskId(localNodeId, getId()), + getType(), + getAction(), + description, + status, + startTime, + System.nanoTime() - startTimeNanos, + this instanceof ProtobufCancellableTask, + this instanceof ProtobufCancellableTask && ((ProtobufCancellableTask) this).isCancelled(), + parentTask, + headers, + resourceStats + ); + } + + /** + * Returns task id + */ + public long getId() { + return id; + } + + /** + * Returns task channel type (netty, transport, direct) + */ + public String getType() { + return type; + } + + /** + * Returns task action + */ + public String getAction() { + return action; + } + + /** + * Generates task description + */ + public String getDescription() { + return description; + } + + /** + * Returns the task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style). + */ + public long getStartTime() { + return startTime; + } + + /** + * Returns the task's start time in nanoseconds ({@link System#nanoTime()} style). + */ + public long getStartTimeNanos() { + return startTimeNanos; + } + + /** + * Returns id of the parent task or NO_PARENT_ID if the task doesn't have any parent tasks + */ + public ProtobufTaskId getParentTaskId() { + return parentTask; + } + + /** + * Build a status for this task or null if this task doesn't have status. + * Since most tasks don't have status this defaults to returning null. While + * this can never perform IO it might be a costly operation, requiring + * collating lists of results, etc. So only use it if you need the value. + */ + public Status getStatus() { + return null; + } + + /** + * Returns thread level resource consumption of the task + */ + public Map> getResourceStats() { + return Collections.unmodifiableMap(resourceStats); + } + + /** + * Returns current total resource usage of the task. + * Currently, this method is only called on demand, during get and listing of tasks. + * In the future, these values can be cached as an optimization. + */ + public TaskResourceUsage getTotalResourceStats() { + return new TaskResourceUsage(getTotalResourceUtilization(ResourceStats.CPU), getTotalResourceUtilization(ResourceStats.MEMORY)); + } + + /** + * Returns total resource consumption for a specific task stat. + */ + public long getTotalResourceUtilization(ResourceStats stats) { + long totalResourceConsumption = 0L; + for (List threadResourceInfosList : resourceStats.values()) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) { + final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats); + if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) { + totalResourceConsumption += statsInfo.getTotalValue(); + } + } + } + return totalResourceConsumption; + } + + /** + * Adds thread's starting resource consumption information + * @param threadId ID of the thread + * @param statsType stats type + * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException matching active thread entry was found which is not expected. + */ + public void startThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { + final List threadResourceInfoList = resourceStats.computeIfAbsent(threadId, k -> new ArrayList<>()); + // active thread entry should not be present in the list + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + throw new IllegalStateException( + "unexpected active thread resource entry present [" + threadId + "]:[" + threadResourceInfo + "]" + ); + } + } + threadResourceInfoList.add(new ThreadResourceInfo(threadId, statsType, resourceUsageMetrics)); + incrementResourceTrackingThreads(); + } + + /** + * This method is used to update the resource consumption stats so that the data isn't too stale for long-running task. + * If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception. + * @param threadId ID of the thread + * @param statsType stats type + * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException if no matching active thread entry was found. + */ + public void updateThreadResourceStats(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { + final List threadResourceInfoList = resourceStats.get(threadId); + if (threadResourceInfoList != null) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + // the active entry present in the list is updated + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics); + return; + } + } + } + throw new IllegalStateException("cannot update if active thread resource entry is not present"); + } + + /** + * Record the thread's final resource consumption values. + * If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception. + * @param threadId ID of the thread + * @param statsType stats type + * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException if no matching active thread entry was found. + */ + public void stopThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { + final List threadResourceInfoList = resourceStats.get(threadId); + if (threadResourceInfoList != null) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + threadResourceInfo.setActive(false); + threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics); + decrementResourceTrackingThreads(); + return; + } + } + } + throw new IllegalStateException("cannot update final values if active thread resource entry is not present"); + } + + /** + * Individual tasks can override this if they want to support task resource tracking. We just need to make sure that + * the ThreadPool on which the task runs on have runnable wrapper similar to + * {@link org.opensearch.common.util.concurrent.OpenSearchExecutors#newResizable} + * + * @return true if resource tracking is supported by the task + */ + public boolean supportsResourceTracking() { + return false; + } + + /** + * Report of the internal status of a task. These can vary wildly from task + * to task because each task is implemented differently but we should try + * to keep each task consistent from version to version where possible. + * That means each implementation of {@linkplain ProtobufTask.Status#toXContent} + * should avoid making backwards incompatible changes to the rendered + * result. But if we change the way a request is implemented it might not + * be possible to preserve backwards compatibility. In that case, we + * can change this on version upgrade but we should be careful + * because some statuses (reindex) have become defacto standardized because + * they are used by systems like Kibana. + */ + public interface Status extends ToXContentObject, NamedWriteable {} + + /** + * Returns stored task header associated with the task + */ + public String getHeader(String header) { + return headers.get(header); + } + + public ProtobufTaskResult result(DiscoveryNode node, Exception error) throws IOException { + return new ProtobufTaskResult(taskInfo(node.getId(), true, true), error); + } + + public ProtobufTaskResult result(DiscoveryNode node, ActionResponse response) throws IOException { + if (response instanceof ToXContent) { + return new ProtobufTaskResult(taskInfo(node.getId(), true, true), (ToXContent) response); + } else { + throw new IllegalStateException("response has to implement ToXContent to be able to store the results"); + } + } + + /** + * Registers a task resource tracking completion listener on this task if resource tracking is still active. + * Returns true on successful subscription, false otherwise. + */ + public boolean addResourceTrackingCompletionListener(NotifyOnceListener listener) { + if (numActiveResourceTrackingThreads.get() > 0) { + resourceTrackingCompletionListeners.add(listener); + return true; + } + + return false; + } + + /** + * Increments the number of active resource tracking threads. + * + * @return the number of active resource tracking threads. + */ + public int incrementResourceTrackingThreads() { + return numActiveResourceTrackingThreads.incrementAndGet(); + } + + /** + * Decrements the number of active resource tracking threads. + * This method is called when threads finish execution, and also when the task is unregistered (to mark the task's + * own thread as complete). When the active thread count becomes zero, the onTaskResourceTrackingCompleted method + * is called exactly once on all registered listeners. + * + * Since a task is unregistered after the message is processed, it implies that the threads responsible to produce + * the response must have started prior to it (i.e. startThreadResourceTracking called before unregister). + * This ensures that the number of active threads doesn't drop to zero pre-maturely. + * + * Rarely, some threads may even start execution after the task is unregistered. As resource stats are piggy-backed + * with the response, any thread usage info captured after the task is unregistered may be irrelevant. + * + * @return the number of active resource tracking threads. + */ + public int decrementResourceTrackingThreads() { + int count = numActiveResourceTrackingThreads.decrementAndGet(); + + if (count == 0) { + List listenerExceptions = new ArrayList<>(); + resourceTrackingCompletionListeners.forEach(listener -> { + try { + listener.onResponse(this); + } catch (Exception e1) { + try { + listener.onFailure(e1); + } catch (Exception e2) { + listenerExceptions.add(e2); + } + } + }); + ExceptionsHelper.maybeThrowRuntimeAndSuppress(listenerExceptions); + } + + return count; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTaskAwareRequest.java b/server/src/main/java/org/opensearch/tasks/ProtobufTaskAwareRequest.java new file mode 100644 index 0000000000000..beebdea5beebb --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTaskAwareRequest.java @@ -0,0 +1,50 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import java.util.Map; + +/** + * An interface for a request that can be used to register a task manager task +* +* @opensearch.internal +*/ +public interface ProtobufTaskAwareRequest { + /** + * Set a reference to task that caused this task to be run. + */ + default void setParentTask(String parentTaskNode, long parentTaskId) { + setParentTask(new ProtobufTaskId(parentTaskNode, parentTaskId)); + } + + /** + * Set a reference to task that created this request. + */ + void setParentTask(ProtobufTaskId taskId); + + /** + * Get a reference to the task that created this request. Implementers should default to + * {@link ProtobufTaskId#EMPTY_TASK_ID}, meaning "there is no parent". + */ + ProtobufTaskId getParentTask(); + + /** + * Returns the task object that should be used to keep track of the processing of the request. + */ + default ProtobufTask createTask(long id, String type, String action, ProtobufTaskId parentTaskId, Map headers) { + return new ProtobufTask(id, type, action, getDescription(), parentTaskId, headers); + } + + /** + * Returns optional description of the request to be displayed by the task manager + */ + default String getDescription() { + return ""; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTaskId.java b/server/src/main/java/org/opensearch/tasks/ProtobufTaskId.java new file mode 100644 index 0000000000000..81ba9e2cabd5d --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTaskId.java @@ -0,0 +1,92 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import org.opensearch.OpenSearchParseException; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.ProtobufWriteable; +import org.opensearch.tasks.proto.TaskIdProto; + +import java.io.IOException; + +/** + * Task id that consists of node id and id of the task on the node +* +* @opensearch.internal +*/ +public final class ProtobufTaskId implements ProtobufWriteable { + + public static final ProtobufTaskId EMPTY_TASK_ID = new ProtobufTaskId(); + + private final TaskIdProto.TaskId taskId; + + public ProtobufTaskId(String nodeId, long id) { + this.taskId = TaskIdProto.TaskId.newBuilder().setNodeId(nodeId).setId(id).build(); + } + + /** + * Builds {@link #EMPTY_TASK_ID}. + */ + private ProtobufTaskId() { + this.taskId = TaskIdProto.TaskId.newBuilder().setNodeId("").setId(-1L).build(); + } + + public ProtobufTaskId(String taskId) { + if (Strings.hasLength(taskId) && "unset".equals(taskId) == false) { + String[] s = Strings.split(taskId, ":"); + if (s == null || s.length != 2) { + throw new IllegalArgumentException("malformed task id " + taskId); + } + String nodeId = s[0]; + try { + long id = Long.parseLong(s[1]); + this.taskId = TaskIdProto.TaskId.newBuilder().setNodeId(nodeId).setId(id).build(); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException("malformed task id " + taskId, ex); + } + } else { + this.taskId = EMPTY_TASK_ID.taskId; + } + } + + /** + * Read a {@linkplain ProtobufTaskId} from a stream. {@linkplain ProtobufTaskId} has this rather than the usual constructor that takes a + * {@linkplain CodedInputStream} so we can return the {@link #EMPTY_TASK_ID} without allocating. + */ + public static ProtobufTaskId readFromStream(CodedInputStream in) throws IOException { + String nodeId = in.readString(); + if (nodeId.isEmpty()) { + /* + * The only TaskId allowed to have the empty string as its nodeId is the EMPTY_TASK_ID and there is only ever one of it and it + * never writes its taskId to save bytes on the wire because it is by far the most common TaskId. + */ + return EMPTY_TASK_ID; + } + return new ProtobufTaskId(nodeId, in.readInt64()); + } + + @Override + public void writeTo(com.google.protobuf.CodedOutputStream out) throws IOException { + this.taskId.writeTo(out); + } + + public String getNodeId() { + return this.taskId.getNodeId(); + } + + public long getId() { + return this.taskId.getId(); + } + + public boolean isSet() { + return this.taskId.getId() != -1L; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTaskInfo.java b/server/src/main/java/org/opensearch/tasks/ProtobufTaskInfo.java new file mode 100644 index 0000000000000..e0acdb7f54ec1 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTaskInfo.java @@ -0,0 +1,231 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import org.opensearch.Version; +import org.opensearch.common.Strings; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.ProtobufStreamInput; +import org.opensearch.common.io.stream.ProtobufStreamOutput; +import org.opensearch.common.io.stream.ProtobufWriteable; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg; +import static org.opensearch.core.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * Information about a currently running task. +*

+* Tasks are used for communication with transport actions. As a result, they can contain callback +* references as well as mutable state. That makes it impractical to send tasks over transport channels +* and use in APIs. Instead, immutable and writeable ProtobufTaskInfo objects are used to represent +* snapshot information about currently running tasks. +* +* @opensearch.internal +*/ +public final class ProtobufTaskInfo implements ProtobufWriteable { + private final ProtobufTaskId taskId; + + private final String type; + + private final String action; + + private final String description; + + private final long startTime; + + private final long runningTimeNanos; + + private final ProtobufTask.Status status; + + private final boolean cancellable; + + private final boolean cancelled; + + private final ProtobufTaskId parentTaskId; + + private final Map headers; + + private final ProtobufTaskResourceStats resourceStats; + + private ProtobufStreamInput protobufStreamInput; + + private ProtobufStreamOutput protobufStreamOutput; + + public ProtobufTaskInfo( + ProtobufTaskId taskId, + String type, + String action, + String description, + ProtobufTask.Status status, + long startTime, + long runningTimeNanos, + boolean cancellable, + boolean cancelled, + ProtobufTaskId parentTaskId, + Map headers, + ProtobufTaskResourceStats resourceStats + ) { + if (cancellable == false && cancelled == true) { + throw new IllegalArgumentException("task cannot be cancelled"); + } + this.taskId = taskId; + this.type = type; + this.action = action; + this.description = description; + this.status = status; + this.startTime = startTime; + this.runningTimeNanos = runningTimeNanos; + this.cancellable = cancellable; + this.cancelled = cancelled; + this.parentTaskId = parentTaskId; + this.headers = headers; + this.resourceStats = resourceStats; + } + + /** + * Read from a stream. + */ + @SuppressWarnings("unchecked") + public ProtobufTaskInfo(CodedInputStream in) throws IOException { + protobufStreamInput = new ProtobufStreamInput(); + taskId = ProtobufTaskId.readFromStream(in); + type = in.readString(); + action = in.readString(); + description = protobufStreamInput.readOptionalString(in); + //TODO: fix this + status = null; + startTime = in.readInt64(); + runningTimeNanos = in.readInt64(); + cancellable = in.readBool(); + if (protobufStreamInput.getVersion().onOrAfter(Version.V_2_0_0)) { + cancelled = in.readBool(); + } else { + cancelled = false; + } + if (cancellable == false && cancelled == true) { + throw new IllegalArgumentException("task cannot be cancelled"); + } + parentTaskId = ProtobufTaskId.readFromStream(in); + headers = protobufStreamInput.readMap(CodedInputStream::readString, CodedInputStream::readString, in); + if (protobufStreamInput.getVersion().onOrAfter(Version.V_2_1_0)) { + resourceStats = protobufStreamInput.readOptionalWriteable(ProtobufTaskResourceStats::new, in); + } else { + resourceStats = null; + } + } + + @Override + public void writeTo(CodedOutputStream out) throws IOException { + protobufStreamOutput = new ProtobufStreamOutput(); + taskId.writeTo(out); + out.writeString(1, type); + out.writeString(2, action); + out.writeString(3, description); + //TODO: fix this + // out.writeOptionalNamedWriteable(status); + out.writeInt64(4, startTime); + out.writeInt64(5, runningTimeNanos); + out.writeBool(6, cancellable); + if (protobufStreamOutput.getVersion().onOrAfter(Version.V_2_0_0)) { + out.writeBool(7, cancelled); + } + parentTaskId.writeTo(out); + protobufStreamOutput.writeMap(headers, CodedOutputStream::writeString, CodedOutputStream::writeString, out); + if (protobufStreamOutput.getVersion().onOrAfter(Version.V_2_1_0)) { + out.writeOptionalWriteable(resourceStats, out); + } + } + + public ProtobufTaskId getTaskId() { + return taskId; + } + + public long getId() { + return taskId.getId(); + } + + public String getType() { + return type; + } + + public String getAction() { + return action; + } + + public String getDescription() { + return description; + } + + /** + * The status of the running task. Only available if TaskInfos were build + * with the detailed flag. + */ + public ProtobufTask.Status getStatus() { + return status; + } + + /** + * Returns the task start time + */ + public long getStartTime() { + return startTime; + } + + /** + * Returns the task running time + */ + public long getRunningTimeNanos() { + return runningTimeNanos; + } + + /** + * Returns true if the task supports cancellation + */ + public boolean isCancellable() { + return cancellable; + } + + /** + * Returns true if the task has been cancelled + */ + public boolean isCancelled() { + return cancelled; + } + + /** + * Returns the parent task id + */ + public ProtobufTaskId getParentTaskId() { + return parentTaskId; + } + + /** + * Returns the task headers + */ + public Map getHeaders() { + return headers; + } + + /** + * Returns the task resource information + */ + public ProtobufTaskResourceStats getResourceStats() { + return resourceStats; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTaskResourceStats.java b/server/src/main/java/org/opensearch/tasks/ProtobufTaskResourceStats.java new file mode 100644 index 0000000000000..30de39551c335 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTaskResourceStats.java @@ -0,0 +1,48 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import org.opensearch.common.io.stream.ProtobufWriteable; +import org.opensearch.tasks.proto.TaskResourceStatsProto; + +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.CodedInputStream; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Resource information about a currently running task. +*

+* Writeable TaskResourceStats objects are used to represent resource +* snapshot information about currently running task. +* +* @opensearch.internal +*/ +public class ProtobufTaskResourceStats implements ProtobufWriteable { + private final TaskResourceStatsProto.TaskResourceStats taskResourceStats; + + /** + * Read from a stream. + */ + public ProtobufTaskResourceStats(CodedInputStream in) throws IOException { + this.taskResourceStats = TaskResourceStatsProto.TaskResourceStats.parseFrom(in.readByteArray()); + } + + public Map getResourceUsageInfo() { + return this.taskResourceStats.getResourceUsageMap(); + } + + @Override + public void writeTo(CodedOutputStream out) throws IOException { + this.taskResourceStats.writeTo(out); + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTaskResult.java b/server/src/main/java/org/opensearch/tasks/ProtobufTaskResult.java new file mode 100644 index 0000000000000..c3314ae40455f --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTaskResult.java @@ -0,0 +1,226 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import org.opensearch.OpenSearchException; +import org.opensearch.client.Requests; +import org.opensearch.common.Nullable; +import org.opensearch.core.ParseField; +import org.opensearch.common.Strings; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.ProtobufWriteable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.xcontent.InstantiatingObjectParser; +import org.opensearch.common.xcontent.ObjectParserHelper; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import static java.util.Collections.emptyMap; +import static java.util.Objects.requireNonNull; +import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg; +import static org.opensearch.core.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.opensearch.common.xcontent.XContentHelper.convertToMap; + +/** + * Information about a running task or a task that stored its result. Running tasks just have a {@link #getTask()} while +* tasks with stored result will have either a {@link #getError()} or {@link #getResponse()}. +* +* @opensearch.internal +*/ +public final class ProtobufTaskResult implements ProtobufWriteable, ToXContentObject { + private final boolean completed; + private final ProtobufTaskInfo task; + @Nullable + private final BytesReference error; + @Nullable + private final BytesReference response; + + /** + * Construct a {@linkplain TaskResult} for a task for which we don't have a result or error. That usually means that the task + * is incomplete, but it could also mean that we waited for the task to complete but it didn't save any error information. + */ + public ProtobufTaskResult(boolean completed, ProtobufTaskInfo task) { + this(completed, task, null, null); + } + + /** + * Construct a {@linkplain TaskResult} for a task that completed with an error. + */ + public ProtobufTaskResult(ProtobufTaskInfo task, Exception error) throws IOException { + this(true, task, toXContent(error), null); + } + + /** + * Construct a {@linkplain ProtobufTaskResult} for a task that completed successfully. + */ + public ProtobufTaskResult(ProtobufTaskInfo task, ToXContent response) throws IOException { + this(true, task, null, XContentHelper.toXContent(response, Requests.INDEX_CONTENT_TYPE, true)); + } + + public ProtobufTaskResult(boolean completed, ProtobufTaskInfo task, @Nullable BytesReference error, @Nullable BytesReference result) { + this.completed = completed; + this.task = requireNonNull(task, "task is required"); + this.error = error; + this.response = result; + } + + /** + * Read from a stream. + */ + public ProtobufTaskResult(com.google.protobuf.CodedInputStream in) throws IOException { + completed = in.readBool(); + task = new ProtobufTaskInfo(in); + error = in.readOptionalBytesReference(); + response = in.readOptionalBytesReference(); + } + + @Override + public void writeTo(com.google.protobuf.CodedOutputStream out) throws IOException { + out.writeBool(0, completed); + task.writeTo(out); + out.writeByteArray(1, error); + out.writeOptionalBytesReference(response); + } + + /** + * Get the task that this wraps. + */ + public ProtobufTaskInfo getTask() { + return task; + } + + /** + * Get the error that finished this task. Will return null if the task didn't finish with an error, it hasn't yet finished, or didn't + * store its result. + */ + public BytesReference getError() { + return error; + } + + /** + * Convert {@link #getError()} from XContent to a Map for easy processing. Will return an empty map if the task didn't finish with an + * error, hasn't yet finished, or didn't store its result. + */ + public Map getErrorAsMap() { + if (error == null) { + return emptyMap(); + } + return convertToMap(error, false).v2(); + } + + /** + * Get the response that this task finished with. Will return null if the task was finished by an error, it hasn't yet finished, or + * didn't store its result. + */ + public BytesReference getResponse() { + return response; + } + + /** + * Convert {@link #getResponse()} from XContent to a Map for easy processing. Will return an empty map if the task was finished with an + * error, hasn't yet finished, or didn't store its result. + */ + public Map getResponseAsMap() { + if (response == null) { + return emptyMap(); + } + return convertToMap(response, false).v2(); + } + + public boolean isCompleted() { + return completed; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + innerToXContent(builder, params); + return builder.endObject(); + } + + public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("completed", completed); + builder.startObject("task"); + task.toXContent(builder, params); + builder.endObject(); + if (error != null) { + XContentHelper.writeRawField("error", error, builder, params); + } + if (response != null) { + XContentHelper.writeRawField("response", response, builder, params); + } + return builder; + } + + public static final InstantiatingObjectParser PARSER; + + static { + InstantiatingObjectParser.Builder parser = InstantiatingObjectParser.builder( + "stored_task_result", + true, + ProtobufTaskResult.class + ); + parser.declareBoolean(constructorArg(), new ParseField("completed")); + parser.declareObject(constructorArg(), ProtobufTaskInfo.PARSER, new ParseField("task")); + ObjectParserHelper parserHelper = new ObjectParserHelper<>(); + parserHelper.declareRawObject(parser, optionalConstructorArg(), new ParseField("error")); + parserHelper.declareRawObject(parser, optionalConstructorArg(), new ParseField("response")); + PARSER = parser.build(); + } + + @Override + public String toString() { + return Strings.toString(XContentType.JSON, this); + } + + // Implements equals and hashcode for testing + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != ProtobufTaskResult.class) { + return false; + } + ProtobufTaskResult other = (ProtobufTaskResult) obj; + /* + * Equality of error and result is done by converting them to a map first. Not efficient but ignores field order and spacing + * differences so perfect for testing. + */ + return Objects.equals(completed, other.completed) + && Objects.equals(task, other.task) + && Objects.equals(getErrorAsMap(), other.getErrorAsMap()) + && Objects.equals(getResponseAsMap(), other.getResponseAsMap()); + } + + @Override + public int hashCode() { + /* + * Hashing of error and result is done by converting them to a map first. Not efficient but ignores field order and spacing + * differences so perfect for testing. + */ + return Objects.hash(completed, task, getErrorAsMap(), getResponseAsMap()); + } + + private static BytesReference toXContent(Exception error) throws IOException { + try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { + builder.startObject(); + OpenSearchException.generateThrowableXContent(builder, ToXContent.EMPTY_PARAMS, error); + builder.endObject(); + return BytesReference.bytes(builder); + } + } +}