Skip to content

Commit

Permalink
Refactor ProcessorExecutionDetail to improve field handling
Browse files Browse the repository at this point in the history
Signed-off-by: Junwei Dai <[email protected]>
  • Loading branch information
Junwei Dai committed Dec 13, 2024
1 parent 488377f commit 719fa1c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.function.Supplier;

import static org.opensearch.action.search.SearchResponseSections.EXT_FIELD;
import static org.opensearch.action.search.SearchResponseSections.PROCESSOR_RESULT_FIELD;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
Expand Down Expand Up @@ -519,6 +520,11 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
extBuilders.add(searchExtBuilder);
}
}
} else if (PROCESSOR_RESULT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
while ((token = parser.nextToken()) != Token.END_ARRAY) {
ProcessorExecutionDetail detail = ProcessorExecutionDetail.fromXContent(parser);
processorResult.add(detail);
}
} else {
parser.skipChildren();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
public class SearchResponseSections implements ToXContentFragment {

public static final ParseField EXT_FIELD = new ParseField("ext");
public static final ParseField PROCESSOR_RESULT_FIELD = new ParseField("processor_results");
protected final SearchHits hits;
protected final Aggregations aggregations;
protected final Suggest suggest;
Expand Down Expand Up @@ -181,7 +182,7 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
}

if (!processorResult.isEmpty()) {
builder.field("processor_result", processorResult);
builder.field(PROCESSOR_RESULT_FIELD.getPreferredName(), processorResult);
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@

package org.opensearch.search.pipeline;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand All @@ -33,7 +34,10 @@ public class ProcessorExecutionDetail implements Writeable, ToXContentObject {
private long durationMillis;
private Object inputData;
private Object outputData;
private static final Logger logger = LogManager.getLogger(ProcessorExecutionDetail.class);
public static final ParseField PROCESSOR_NAME_FIELD = new ParseField("processor_name");
public static final ParseField DURATION_MILLIS_FIELD = new ParseField("duration_millis");
public static final ParseField INPUT_DATA_FIELD = new ParseField("input_data");
public static final ParseField OUTPUT_DATA_FIELD = new ParseField("output_data");

/**
* Constructor for ProcessorExecutionDetail
Expand Down Expand Up @@ -111,13 +115,10 @@ public void addTook(long durationMillis) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("processor_name", processorName);
builder.field("duration_millis", durationMillis);

addFieldToXContent(builder, "input_data", inputData, params);

addFieldToXContent(builder, "output_data", outputData, params);

builder.field(PROCESSOR_NAME_FIELD.getPreferredName(), processorName);
builder.field(DURATION_MILLIS_FIELD.getPreferredName(), durationMillis);
addFieldToXContent(builder, INPUT_DATA_FIELD.getPreferredName(), inputData, params);
addFieldToXContent(builder, OUTPUT_DATA_FIELD.getPreferredName(), outputData, params);
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -161,6 +162,63 @@ public boolean equals(Object o) {
&& Objects.equals(outputData, that.outputData);
}

public static ProcessorExecutionDetail fromXContent(XContentParser parser) throws IOException {
String processorName = null;
long durationMillis = 0;
Object inputData = null;
Object outputData = null;

while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();

if (PROCESSOR_NAME_FIELD.match(fieldName, parser.getDeprecationHandler())) {
processorName = parser.text();
} else if (DURATION_MILLIS_FIELD.match(fieldName, parser.getDeprecationHandler())) {
durationMillis = parser.longValue();
} else if (INPUT_DATA_FIELD.match(fieldName, parser.getDeprecationHandler())) {
inputData = parseFieldFromXContent(parser);
} else if (OUTPUT_DATA_FIELD.match(fieldName, parser.getDeprecationHandler())) {
outputData = parseFieldFromXContent(parser);
} else {
parser.skipChildren();
}
}

if (processorName == null) {
throw new IllegalArgumentException("Processor name is required");
}

return new ProcessorExecutionDetail(processorName, durationMillis, inputData, outputData);
}

private static Object parseFieldFromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.VALUE_NULL) {
return null;
} else if (token == XContentParser.Token.START_ARRAY) {
return parseArrayFromXContent(parser);
} else if (token == XContentParser.Token.START_OBJECT) {
return parser.map();
} else {
return parser.textOrNull();
}
}

private static List<Object> parseArrayFromXContent(XContentParser parser) throws IOException {
List<Object> list = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
list.add(parser.map());
} else if (parser.currentToken() == XContentParser.Token.START_ARRAY) {
list.add(parseArrayFromXContent(parser));
} else {
list.add(parser.textOrNull());
}
}
return list;
}

@Override
public int hashCode() {
return Objects.hash(processorName, durationMillis, inputData, outputData);
Expand Down

0 comments on commit 719fa1c

Please sign in to comment.