Skip to content

Commit

Permalink
Adding task cancellation timestamp in task API (opensearch-project#7445)
Browse files Browse the repository at this point in the history
* Adding task cancellation time in task API

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Fixing unit tests and addressing comments

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Adding change log for unreleased 2.x

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Removing running time cancel info from task API

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Replacing long primitive with Long object

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Making cancelledAt field human readable

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Fixing failing test

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Removing the feature from unreleased 3.x

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Fixing ListTasksResponseTests failure

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Test failure fix

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Changing naming convention to cancellationStartTime

Signed-off-by: Sagar Upadhyaya <[email protected]>

---------

Signed-off-by: Sagar Upadhyaya <[email protected]>
Signed-off-by: Sagar <[email protected]>
Co-authored-by: Andrew Ross <[email protected]>
  • Loading branch information
sgup432 and andrross authored May 23, 2023
1 parent b4b3724 commit 8d7a544
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
- Add task cancellation timestamp in task API ([#7455](https://github.com/opensearch-project/OpenSearch/pull/7455))
- Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466))
- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394))
- Provide mechanism to configure XContent parsing constraints (after update to Jackson 2.15.0 and above) ([#7550](https://github.com/opensearch-project/OpenSearch/pull/7550))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ static TaskInfo randomTaskInfo() {
boolean cancellable = randomBoolean();
boolean cancelled = cancellable == true ? randomBoolean() : false;
TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
Long cancellationStartTime = null;
if (cancelled) {
cancellationStartTime = randomNonNegativeLong();
}
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
Expand All @@ -110,7 +114,8 @@ static TaskInfo randomTaskInfo() {
cancelled,
parentTaskId,
headers,
randomResourceStats()
randomResourceStats(),
cancellationStartTime
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,17 @@ public void testTasksCancellation() throws Exception {
.get();
assertEquals(1, cancelTasksResponse.getTasks().size());

// Tasks are marked as cancelled at this point but not yet completed.
List<TaskInfo> taskInfoList = client().admin()
.cluster()
.prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "*")
.get()
.getTasks();
for (TaskInfo taskInfo : taskInfoList) {
assertTrue(taskInfo.isCancelled());
assertNotNull(taskInfo.getCancellationStartTime());
}
future.get();

logger.info("--> checking that test tasks are not running");
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/tasks/CancellableTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ public abstract class CancellableTask extends Task {
private volatile String reason;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final TimeValue cancelAfterTimeInterval;
/**
* The time this task was cancelled as a wall clock time since epoch ({@link System#currentTimeMillis()} style).
*/
private Long cancellationStartTime = null;
/**
* The time this task was cancelled as a relative time ({@link System#nanoTime()} style).
*/
private Long cancellationStartTimeNanos = null;

public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT);
Expand All @@ -74,6 +82,8 @@ public CancellableTask(
public void cancel(String reason) {
assert reason != null;
if (cancelled.compareAndSet(false, true)) {
this.cancellationStartTime = System.currentTimeMillis();
this.cancellationStartTimeNanos = System.nanoTime();
this.reason = reason;
onCancelled();
}
Expand All @@ -87,6 +97,14 @@ public boolean cancelOnParentLeaving() {
return true;
}

public Long getCancellationStartTime() {
return cancellationStartTime;
}

public Long getCancellationStartTimeNanos() {
return cancellationStartTimeNanos;
}

/**
* Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled.
*/
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status
* Build a proper {@link TaskInfo} for this task.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) {
boolean cancelled = this instanceof CancellableTask && ((CancellableTask) this).isCancelled();
Long cancellationStartTime = null;
if (cancelled) {
cancellationStartTime = ((CancellableTask) this).getCancellationStartTime();
}
return new TaskInfo(
new TaskId(localNodeId, getId()),
getType(),
Expand All @@ -201,10 +206,11 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status
startTime,
System.nanoTime() - startTimeNanos,
this instanceof CancellableTask,
this instanceof CancellableTask && ((CancellableTask) this).isCancelled(),
cancelled,
parentTask,
headers,
resourceStats
resourceStats,
cancellationStartTime
);
}

Expand Down
61 changes: 58 additions & 3 deletions server/src/main/java/org/opensearch/tasks/TaskInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {

private final boolean cancelled;

private final Long cancellationStartTime;

private final TaskId parentTaskId;

private final Map<String, String> headers;
Expand All @@ -104,6 +106,38 @@ public TaskInfo(
TaskId parentTaskId,
Map<String, String> headers,
TaskResourceStats resourceStats
) {
this(
taskId,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers,
resourceStats,
null
);
}

public TaskInfo(
TaskId taskId,
String type,
String action,
String description,
Task.Status status,
long startTime,
long runningTimeNanos,
boolean cancellable,
boolean cancelled,
TaskId parentTaskId,
Map<String, String> headers,
TaskResourceStats resourceStats,
Long cancellationStartTime
) {
if (cancellable == false && cancelled == true) {
throw new IllegalArgumentException("task cannot be cancelled");
Expand All @@ -120,6 +154,7 @@ public TaskInfo(
this.parentTaskId = parentTaskId;
this.headers = headers;
this.resourceStats = resourceStats;
this.cancellationStartTime = cancellationStartTime;
}

/**
Expand Down Expand Up @@ -150,6 +185,11 @@ public TaskInfo(StreamInput in) throws IOException {
} else {
resourceStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
cancellationStartTime = in.readOptionalLong();
} else {
cancellationStartTime = null;
}
}

@Override
Expand All @@ -170,6 +210,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_1_0)) {
out.writeOptionalWriteable(resourceStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalLong(cancellationStartTime);
}
}

public TaskId getTaskId() {
Expand Down Expand Up @@ -228,6 +271,10 @@ public boolean isCancelled() {
return cancelled;
}

public Long getCancellationStartTime() {
return cancellationStartTime;
}

/**
* Returns the parent task id
*/
Expand Down Expand Up @@ -281,6 +328,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
resourceStats.toXContent(builder, params);
builder.endObject();
}
if (cancellationStartTime != null) {
builder.humanReadableField("cancellation_time_millis", "cancellation_time", new TimeValue(cancellationStartTime));
}
return builder;
}

Expand Down Expand Up @@ -308,6 +358,7 @@ public static TaskInfo fromXContent(XContentParser parser) {
}
@SuppressWarnings("unchecked")
TaskResourceStats resourceStats = (TaskResourceStats) a[i++];
Long cancellationStartTime = (Long) a[i++];
RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes);
TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString);
return new TaskInfo(
Expand All @@ -322,7 +373,8 @@ public static TaskInfo fromXContent(XContentParser parser) {
cancelled,
parentTaskId,
headers,
resourceStats
resourceStats,
cancellationStartTime
);
});
static {
Expand All @@ -341,6 +393,7 @@ public static TaskInfo fromXContent(XContentParser parser) {
PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> TaskResourceStats.fromXContent(p), new ParseField("resource_stats"));
PARSER.declareLong(optionalConstructorArg(), new ParseField("cancellation_time_millis"));
}

@Override
Expand All @@ -366,7 +419,8 @@ public boolean equals(Object obj) {
&& Objects.equals(cancelled, other.cancelled)
&& Objects.equals(status, other.status)
&& Objects.equals(headers, other.headers)
&& Objects.equals(resourceStats, other.resourceStats);
&& Objects.equals(resourceStats, other.resourceStats)
&& Objects.equals(cancellationStartTime, other.cancellationStartTime);
}

@Override
Expand All @@ -383,7 +437,8 @@ public int hashCode() {
cancelled,
status,
headers,
resourceStats
resourceStats,
cancellationStartTime
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testCancellableOptionWhenCancelledTrue() {
long taskId = randomIntBetween(0, 100000);
long startTime = randomNonNegativeLong();
long runningTime = randomNonNegativeLong();
long cancellationStartTime = randomNonNegativeLong();
boolean cancellable = true;
boolean cancelled = true;
TaskInfo taskInfo = new TaskInfo(
Expand All @@ -103,12 +104,14 @@ public void testCancellableOptionWhenCancelledTrue() {
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar"),
randomResourceStats(randomBoolean())
randomResourceStats(randomBoolean()),
cancellationStartTime
);
String taskInfoString = taskInfo.toString();
Map<String, Object> map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2();
assertEquals(map.get("cancellable"), cancellable);
assertEquals(map.get("cancelled"), cancelled);
assertEquals(map.get("cancellation_time_millis"), cancellationStartTime);
}

public void testCancellableOptionWhenCancelledFalse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void testNonEmptyToString() {
{
put("dummy-type1", new TaskResourceUsage(100, 100));
}
})
}),
0L
);
ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList());
assertEquals(
Expand All @@ -105,7 +106,9 @@ public void testNonEmptyToString() {
+ " \"cpu_time_in_nanos\" : 100,\n"
+ " \"memory_in_bytes\" : 100\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"cancellation_time\" : \"0s\",\n"
+ " \"cancellation_time_millis\" : 0\n"
+ " }\n"
+ " ]\n"
+ "}",
Expand Down
Loading

0 comments on commit 8d7a544

Please sign in to comment.