Skip to content

Commit

Permalink
query changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Feb 1, 2024
1 parent 22de6a2 commit eded7a3
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 20 deletions.
Binary file removed server/lib/lucene-core-9.7.0.jar
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
package org.opensearch.action.search;public class SearchStarTreeAction {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
package org.opensearch.action.search;public class SearchStarTreeResponse {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;

/**
* Transport action for creating PIT reader context
*/
public class TransportCreatePitAction extends HandledTransportAction<CreatePitRequest, CreatePitResponse> {

public static final String CREATE_PIT_ACTION = "create_pit";
private final TransportService transportService;
private final SearchTransportService searchTransportService;
private final ClusterService clusterService;
private final TransportSearchAction transportSearchAction;
private final NamedWriteableRegistry namedWriteableRegistry;
private final CreatePitController createPitController;

@Inject
public TransportCreatePitAction(
TransportService transportService,
ActionFilters actionFilters,
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry,
CreatePitController createPitController
) {
super(CreatePitAction.NAME, transportService, actionFilters, in -> new CreatePitRequest(in));
this.transportService = transportService;
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
this.transportSearchAction = transportSearchAction;
this.namedWriteableRegistry = namedWriteableRegistry;
this.createPitController = createPitController;
}

@Override
protected void doExecute(Task task, CreatePitRequest request, ActionListener<CreatePitResponse> listener) {
final StepListener<SearchResponse> createPitListener = new StepListener<>();
final ActionListener<CreatePitResponse> updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> {
logger.error(
() -> new ParameterizedMessage(
"PIT creation failed while updating PIT ID for indices [{}]",
Arrays.toString(request.indices())
)
);
listener.onFailure(e);
});
createPitController.executeCreatePit(request, task, createPitListener, updatePitIdListener);
}

/**
* Request to create pit reader context with keep alive
*/
public static class CreateReaderContextRequest extends TransportRequest {
private final ShardId shardId;
private final TimeValue keepAlive;

public CreateReaderContextRequest(ShardId shardId, TimeValue keepAlive) {
this.shardId = shardId;
this.keepAlive = keepAlive;
}

public ShardId getShardId() {
return shardId;
}

public TimeValue getKeepAlive() {
return keepAlive;
}

public CreateReaderContextRequest(StreamInput in) throws IOException {
super(in);
this.shardId = new ShardId(in);
this.keepAlive = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeTimeValue(keepAlive);
}
}

/**
* Create pit reader context response which holds the contextId
*/
public static class CreateReaderContextResponse extends SearchPhaseResult {
public CreateReaderContextResponse(ShardSearchContextId shardSearchContextId) {
this.contextId = shardSearchContextId;
}

public CreateReaderContextResponse(StreamInput in) throws IOException {
super(in);
contextId = new ShardSearchContextId(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
contextId.writeTo(out);
}
}

}
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,12 @@ public SortedDocValues getSortedDocValues(String field) {
return null;
}

@Override
public Object getAggregatedDocValues()
throws IOException {
return null;
}

public SortedNumericDocValues getSortedNumericDocValues(String field) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@
/** Base class for star tree builder */
public abstract class BaseSingleTreeBuilder {
public static final int STAR_IN_DOC_VALUES_INDEX = -1;
final static int SECOND = 1000;
final static int MINUTE = 60 * SECOND;
final static int HOUR = 60 * 60 * SECOND;
final static int DAY = 24 * HOUR;
final static int YEAR = 365 * DAY;
public final static int SECOND = 1000;
public final static int MINUTE = 60 * SECOND;
public final static int HOUR = 60 * 60 * SECOND;
public final static int DAY = 24 * HOUR;
public final static int YEAR = 365 * DAY;
private static final Logger logger = LogManager.getLogger(BaseSingleTreeBuilder.class);
final int _numDimensions;
final String[] _dimensionsSplitOrder;
Expand Down Expand Up @@ -91,10 +91,11 @@ public abstract class BaseSingleTreeBuilder {
indexOutput = state.directory.createOutput(docFileName, state.context);
CodecUtil.writeIndexHeader(indexOutput, "STARTreeCodec", 0, state.segmentInfo.getId(), state.segmentSuffix);
dimensionsSplitOrder = new ArrayList<>();
// dimensionsSplitOrder.add("hour");
dimensionsSplitOrder.add("minute");
dimensionsSplitOrder.add("hour");
dimensionsSplitOrder.add("day");
dimensionsSplitOrder.add("month");
dimensionsSplitOrder.add("year");
//dimensionsSplitOrder.add("year");
dimensionsSplitOrder.add("status");
_numDimensions = dimensionsSplitOrder.size();
_dimensionsSplitOrder = new String[_numDimensions];
Expand Down Expand Up @@ -560,6 +561,8 @@ private long getTimeStampVal(final String fieldName, final long val) {
return val / HOUR;
case "day":
return val / DAY;
case "month":
return val/DAY * 30; // TODO
case "year":
return val / YEAR;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProdu
}
if (field.name.equalsIgnoreCase("@timestamp")) {
//logger.info("Adding timestamp fields");
//dimensionReaders.put("minute_dim", valuesProducer.getSortedNumeric(field));
dimensionReaders.put("minute_dim", valuesProducer.getSortedNumeric(field));
dimensionReaders.put("hour_dim", valuesProducer.getSortedNumeric(field));
dimensionReaders.put("day_dim", valuesProducer.getSortedNumeric(field));
dimensionReaders.put("month_dim", valuesProducer.getSortedNumeric(field));
dimensionReaders.put("year_dim", valuesProducer.getSortedNumeric(field));
//dimensionReaders.put("year_dim", valuesProducer.getSortedNumeric(field));
//dimensionsSplitOrder.add("minute");

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
/** Query class for querying star tree data structure */
public class StarTreeQuery extends Query implements Accountable {

Map<String, List<Predicate<Integer>>> compositePredicateMap;
Map<String, List<Predicate<Long>>> compositePredicateMap;
Set<String> groupByColumns;

public StarTreeQuery(Map<String, List<Predicate<Integer>>> compositePredicateMap, Set<String> groupByColumns) {
public StarTreeQuery(Map<String, List<Predicate<Long>>> compositePredicateMap, Set<String> groupByColumns) {
this.compositePredicateMap = compositePredicateMap;
this.groupByColumns = groupByColumns;
}
Expand Down Expand Up @@ -78,14 +78,13 @@ public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float bo
@Override
public Scorer scorer(LeafReaderContext context)
throws IOException {
StarTreeAggregatedValues val = null;
Object obj = context.reader().getAggregatedDocValues();
DocIdSetIterator result = null;
// Object obj = context.reader().getAggregatedDocValues();
// if (obj != null) {
// val = (StarTreeAggregatedValues) obj;
// StarTreeFilter filter = new StarTreeFilter(val, compositePredicateMap, groupByColumns);
// result = filter.getStarTreeResult();
// }
if (obj != null) {
StarTreeAggregatedValues val = (StarTreeAggregatedValues) obj;
StarTreeFilter filter = new StarTreeFilter(val, compositePredicateMap, groupByColumns);
result = filter.getStarTreeResult();
}
return new ConstantScoreScorer(this, score(), scoreMode, result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ public SortedDocValues getSortedDocValues(String field) {
throw new UnsupportedOperationException();
}

@Override
public Object getAggregatedDocValues()
throws IOException {
throw new UnsupportedOperationException();
}

@Override
public SortedNumericDocValues getSortedNumericDocValues(String field) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
package org.opensearch.rest.action.search;public class RestSearchStarTreeAction {
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@

package org.opensearch.search.aggregations.metrics;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.DoubleArray;
import org.opensearch.index.codec.freshstartree.codec.StarTreeAggregatedValues;
import org.opensearch.index.fielddata.SortedNumericDoubleValues;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
Expand All @@ -48,6 +53,8 @@

import java.io.IOException;
import java.util.Map;
import org.opensearch.transport.TransportService;


/**
* Aggregate all docs into a single sum value
Expand All @@ -62,6 +69,11 @@ public class SumAggregator extends NumericMetricsAggregator.SingleValue {
private DoubleArray sums;
private DoubleArray compensations;

private AtomicInteger bucket2;

private static final Logger logger = LogManager.getLogger(SumAggregator.class);


SumAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Expand All @@ -77,6 +89,7 @@ public class SumAggregator extends NumericMetricsAggregator.SingleValue {
sums = context.bigArrays().newDoubleArray(1, true);
compensations = context.bigArrays().newDoubleArray(1, true);
}
bucket2 = new AtomicInteger(0);
}

@Override
Expand All @@ -91,14 +104,57 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
}
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
StarTreeAggregatedValues aggrVals = (StarTreeAggregatedValues) ctx.reader().getAggregatedDocValues();
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
//int bucket = 0;
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
//long bucket = bucket2.get();
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);
NumericDocValues dv = aggrVals.metricValues.get("status_sum");
NumericDocValues hourValueDim = aggrVals.dimensionValues.get("hour");
NumericDocValues dayValueDim = aggrVals.dimensionValues.get("day");
NumericDocValues statusValueDim = aggrVals.dimensionValues.get("status");
NumericDocValues minuteValueDim = aggrVals.dimensionValues.get("minute");
NumericDocValues monthValueDim = aggrVals.dimensionValues.get("month");

if (dv.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
double value = dv.longValue();
hourValueDim.advanceExact(doc);
long hour = hourValueDim.longValue();

if (values.advanceExact(doc)) {
dayValueDim.advanceExact(doc);
long day = dayValueDim.longValue();

statusValueDim.advanceExact(doc);
long status = statusValueDim.longValue();

minuteValueDim.advanceExact(doc);
long minute = minuteValueDim.longValue();


monthValueDim.advanceExact(doc);
long month = monthValueDim.longValue();

logger.info("Day : {} , hour : {} , status : {}, month : {} , minute: {}", day, hour, status, month, minute);

kahanSummation.add(value);

compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
//bucket2.set((int) (bucket + 1l));
//bucket++;
}
else
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
Expand Down
Loading

0 comments on commit eded7a3

Please sign in to comment.