From 958f8c724154dbe4462e78bb7f0af8bfacb0de71 Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Mon, 7 Oct 2024 17:37:10 +0000 Subject: [PATCH] HACK --- server/build.gradle | 3 +- .../org/opensearch/search/SearchHits.java | 79 ++++++++++++++----- .../search/fetch/FetchSearchResult.java | 37 ++++++--- .../transport/protobuf/SearchHitProtobuf.java | 2 +- .../protobuf/SearchHitsProtobuf.java | 2 +- 5 files changed, 89 insertions(+), 34 deletions(-) diff --git a/server/build.gradle b/server/build.gradle index eccaf8a127647..14398c3451d14 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -145,8 +145,7 @@ tasks.withType(JavaCompile).configureEach { } compileJava { - options.compilerArgs += ['-processor', ['org.apache.logging.log4j.core.config.plugins.processor.PluginProcessor', - 'org.opensearch.common.annotation.processor.ApiAnnotationProcessor'].join(',')] + options.compilerArgs += ['-processor', ['org.apache.logging.log4j.core.config.plugins.processor.PluginProcessor'].join(',')] } tasks.named("internalClusterTest").configure { diff --git a/server/src/main/java/org/opensearch/search/SearchHits.java b/server/src/main/java/org/opensearch/search/SearchHits.java index 58b12fd9cd628..ab67eecee65a0 100644 --- a/server/src/main/java/org/opensearch/search/SearchHits.java +++ b/server/src/main/java/org/opensearch/search/SearchHits.java @@ -41,10 +41,13 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.proto.search.SearchHitsProtoDef; import org.opensearch.rest.action.search.RestSearchAction; +import org.opensearch.transport.protobuf.SearchHitProtobuf; import java.io.IOException; import java.util.ArrayList; @@ -54,6 +57,8 @@ import java.util.Objects; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.transport.protobuf.ProtoSerDeHelpers.sortFieldToProto; +import static org.opensearch.transport.protobuf.SearchHitProtobuf.sortValueToProto; /** * Encapsulates the results of a search operation @@ -226,34 +231,66 @@ public static final class Fields { public static final String MAX_SCORE = "max_score"; } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(Fields.HITS); - boolean totalHitAsInt = params.paramAsBoolean(RestSearchAction.TOTAL_HITS_AS_INT_PARAM, false); - if (totalHitAsInt) { - long total = totalHits == null ? -1 : totalHits.value; - builder.field(Fields.TOTAL, total); - } else if (totalHits != null) { - builder.startObject(Fields.TOTAL); - builder.field("value", totalHits.value); - builder.field("relation", totalHits.relation == Relation.EQUAL_TO ? "eq" : "gte"); - builder.endObject(); + + //////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////// + + public SearchHitsProtoDef.SearchHitsProto toProto() { + SearchHitsProtoDef.SearchHitsProto.Builder builder = SearchHitsProtoDef.SearchHitsProto.newBuilder().setMaxScore(maxScore); + + for (SearchHit hit : hits) { + builder.addHits(new SearchHitProtobuf(hit).toProto()); } - if (Float.isNaN(maxScore)) { - builder.nullField(Fields.MAX_SCORE); - } else { - builder.field(Fields.MAX_SCORE, maxScore); + + if (collapseField != null) { + builder.setCollapseField(collapseField); } - builder.field(Fields.HITS); - builder.startArray(); - for (SearchHit hit : hits) { - hit.toXContent(builder, params); + + if (totalHits != null) { + SearchHitsProtoDef.TotalHitsProto.Builder totHitsBuilder = SearchHitsProtoDef.TotalHitsProto.newBuilder() + .setRelation(totalHits.relation.ordinal()) + .setValue(totalHits.value); + builder.setTotalHits(totHitsBuilder); + } + + if (sortFields != null) { + for (SortField field : sortFields) { + builder.addSortFields(sortFieldToProto(field)); + } } - builder.endArray(); + + if (collapseValues != null) { + for (Object col : collapseValues) { + builder.addCollapseValues(sortValueToProto(col)); + } + } + + return builder.build(); + } + + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + System.out.println("SearchHits.toXContent"); + + builder.startObject(Fields.HITS); + + com.google.protobuf.ByteString bString = toProto().toByteString(); + System.out.println("Client response size: " + bString.size()); + + builder.rawField("protobuf", bString.newInput(), MediaType.fromMediaType("application/octet-stream")); + builder.endObject(); return builder; } + + //////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////// + + public static SearchHits fromXContent(XContentParser parser) throws IOException { if (parser.currentToken() != XContentParser.Token.START_OBJECT) { parser.nextToken(); diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java b/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java index b64638972ed49..fa8759e73cb3b 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java @@ -35,6 +35,7 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.proto.search.fetch.FetchSearchResultProtoDef; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; import org.opensearch.search.SearchPhaseResult; @@ -42,6 +43,9 @@ import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.transport.protobuf.FetchSearchResultProtobuf; +import org.opensearch.transport.protobuf.SearchHitsProtobuf; + import java.io.IOException; /** @@ -58,12 +62,33 @@ public class FetchSearchResult extends SearchPhaseResult { public FetchSearchResult() {} + //////////////////////////////////////////////// + //////////////////////////////////////////////// + //////////////////////////////////////////////// + public FetchSearchResult(StreamInput in) throws IOException { - super(in); - contextId = new ShardSearchContextId(in); - hits = new SearchHits(in); + System.out.println("FetchSearchResult In - PROTOBUF"); + FetchSearchResultProtoDef.FetchSearchResultProto proto = FetchSearchResultProtoDef.FetchSearchResultProto.parseFrom(in); + hits = new SearchHitsProtobuf(proto.getHits()); + } + + FetchSearchResultProtoDef.FetchSearchResultProto toProto() { + FetchSearchResultProtoDef.FetchSearchResultProto.Builder builder = FetchSearchResultProtoDef.FetchSearchResultProto.newBuilder() + .setHits(new SearchHitsProtobuf(hits).toProto()) + .setCounter(this.counter); + return builder.build(); } + @Override + public void writeTo(StreamOutput out) throws IOException { + System.out.println("FetchSearchResult Out - PROTOBUF"); + toProto().writeTo(out); + } + + //////////////////////////////////////////////// + //////////////////////////////////////////////// + //////////////////////////////////////////////// + public FetchSearchResult(ShardSearchContextId id, SearchShardTarget shardTarget) { this.contextId = id; setSearchShardTarget(shardTarget); @@ -103,10 +128,4 @@ public FetchSearchResult initCounter() { public int counterGetAndIncrement() { return counter++; } - - @Override - public void writeTo(StreamOutput out) throws IOException { - contextId.writeTo(out); - hits.writeTo(out); - } } diff --git a/server/src/main/java/org/opensearch/transport/protobuf/SearchHitProtobuf.java b/server/src/main/java/org/opensearch/transport/protobuf/SearchHitProtobuf.java index 5e4253b6db83e..a2edbbac8a103 100644 --- a/server/src/main/java/org/opensearch/transport/protobuf/SearchHitProtobuf.java +++ b/server/src/main/java/org/opensearch/transport/protobuf/SearchHitProtobuf.java @@ -65,7 +65,7 @@ public void fromProtobufStream(StreamInput in) throws IOException { fromProto(proto); } - SearchHitProto toProto() { + public SearchHitProto toProto() { SearchHitProto.Builder builder = SearchHitProto.newBuilder() .setScore(score) .setVersion(version) diff --git a/server/src/main/java/org/opensearch/transport/protobuf/SearchHitsProtobuf.java b/server/src/main/java/org/opensearch/transport/protobuf/SearchHitsProtobuf.java index 413801c732922..560ab5560a5e8 100644 --- a/server/src/main/java/org/opensearch/transport/protobuf/SearchHitsProtobuf.java +++ b/server/src/main/java/org/opensearch/transport/protobuf/SearchHitsProtobuf.java @@ -59,7 +59,7 @@ public void fromProtobufStream(StreamInput in) throws IOException { fromProto(proto); } - SearchHitsProto toProto() { + public SearchHitsProto toProto() { SearchHitsProto.Builder builder = SearchHitsProto.newBuilder().setMaxScore(maxScore); for (SearchHit hit : hits) {