Skip to content

Commit

Permalink
InternalMultiBucketAggregation.InternalBucket does not implement writ…
Browse files Browse the repository at this point in the history
…able anymore (elastic#117310) (elastic#117535)

This allows to make some Bucket implementations leaner, in particular terms and multi-terms aggregations
  • Loading branch information
iverase authored Nov 26, 2024
1 parent a25a87f commit 46c9a75
Show file tree
Hide file tree
Showing 24 changed files with 94 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class InternalAdjacencyMatrix extends InternalMultiBucketAggregation<InternalAdjacencyMatrix, InternalAdjacencyMatrix.InternalBucket>
implements
AdjacencyMatrix {
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket implements AdjacencyMatrix.Bucket {
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucketWritable implements AdjacencyMatrix.Bucket {

private final String key;
private final long docCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class InternalTimeSeries extends InternalMultiBucketAggregation<InternalT
/**
* A bucket associated with a specific time series (identified by its key)
*/
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket {
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucketWritable {
protected long bucketOrd;
protected final BytesRef key;
// TODO: make computing docCount optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private List<B> reducePipelineBuckets(AggregationReduceContext reduceContext, Pi
return reducedBuckets;
}

public abstract static class InternalBucket implements Bucket, Writeable {
public abstract static class InternalBucket implements Bucket {

public Object getProperty(String containingAggName, List<String> path) {
if (path.isEmpty()) {
Expand Down Expand Up @@ -248,4 +248,8 @@ public Object getProperty(String containingAggName, List<String> path) {
return aggregation.getProperty(path.subList(1, path.size()));
}
}

/** A {@link InternalBucket} that implements the {@link Writeable} interface. Most implementation might want
* to use this one except when specific logic is need to write into the stream. */
public abstract static class InternalBucketWritable extends InternalBucket implements Writeable {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ public int hashCode() {
return Objects.hash(super.hashCode(), size, buckets, afterKey, Arrays.hashCode(reverseMuls), Arrays.hashCode(missingOrders));
}

public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket implements CompositeAggregation.Bucket {
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucketWritable
implements
CompositeAggregation.Bucket {

private final CompositeKey key;
private final long docCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.Objects;

public class InternalFilters extends InternalMultiBucketAggregation<InternalFilters, InternalFilters.InternalBucket> implements Filters {
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket implements Filters.Bucket {
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucketWritable implements Filters.Bucket {

private final String key;
private long docCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.io.IOException;
import java.util.Objects;

public abstract class InternalGeoGridBucket extends InternalMultiBucketAggregation.InternalBucket
public abstract class InternalGeoGridBucket extends InternalMultiBucketAggregation.InternalBucketWritable
implements
GeoGrid.Bucket,
Comparable<InternalGeoGridBucket> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/**
* A bucket in the histogram where documents fall in
*/
public abstract class AbstractHistogramBucket extends InternalMultiBucketAggregation.InternalBucket {
public abstract class AbstractHistogramBucket extends InternalMultiBucketAggregation.InternalBucketWritable {

protected final long docCount;
protected final InternalAggregations aggregations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

public class InternalIpPrefix extends InternalMultiBucketAggregation<InternalIpPrefix, InternalIpPrefix.Bucket> {

public static class Bucket extends InternalMultiBucketAggregation.InternalBucket
public static class Bucket extends InternalMultiBucketAggregation.InternalBucketWritable
implements
IpPrefix.Bucket,
KeyComparable<InternalIpPrefix.Bucket> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public final class InternalBinaryRange extends InternalMultiBucketAggregation<In
implements
Range {

public static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements Range.Bucket {
public static class Bucket extends InternalMultiBucketAggregation.InternalBucketWritable implements Range.Bucket {

private final transient DocValueFormat format;
private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
@SuppressWarnings("rawtypes")
static final Factory FACTORY = new Factory();

public static class Bucket extends InternalMultiBucketAggregation.InternalBucket implements Range.Bucket {
public static class Bucket extends InternalMultiBucketAggregation.InternalBucketWritable implements Range.Bucket {

protected final transient DocValueFormat format;
protected final double from;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public abstract static class AbstractTermsBucket<B extends AbstractTermsBucket<B

protected abstract void setDocCountError(long docCountError);

protected abstract boolean getShowDocCountError();

protected abstract long getDocCountError();

protected abstract void bucketToXContent(XContentBuilder builder, Params params, boolean showDocCountError) throws IOException;
Expand All @@ -91,6 +89,8 @@ public abstract static class AbstractTermsBucket<B extends AbstractTermsBucket<B

protected abstract int getRequiredSize();

protected abstract boolean getShowDocCountError();

protected abstract B createBucket(long docCount, InternalAggregations aggs, long docCountError, B prototype);

private B reduceBucket(List<B> buckets, AggregationReduceContext context) {
Expand All @@ -104,7 +104,7 @@ private B reduceBucket(List<B> buckets, AggregationReduceContext context) {
for (B bucket : buckets) {
docCount += bucket.getDocCount();
if (docCountError != -1) {
if (bucket.getShowDocCountError() == false || bucket.getDocCountError() == -1) {
if (getShowDocCountError() == false || bucket.getDocCountError() == -1) {
docCountError = -1;
} else {
docCountError += bucket.getDocCountError();
Expand Down Expand Up @@ -257,23 +257,25 @@ public void accept(InternalAggregation aggregation) {
}
otherDocCount[0] += terms.getSumOfOtherDocCounts();
final long thisAggDocCountError = getDocCountError(terms);
setDocCountError(thisAggDocCountError);
if (sumDocCountError != -1) {
if (thisAggDocCountError == -1) {
sumDocCountError = -1;
} else {
sumDocCountError += thisAggDocCountError;
}
}
setDocCountError(thisAggDocCountError);
for (B bucket : terms.getBuckets()) {
// If there is already a doc count error for this bucket
// subtract this aggs doc count error from it to make the
// new value for the bucket. This then means that when the
// final error for the bucket is calculated below we account
// for the existing error calculated in a previous reduce.
// Note that if the error is unbounded (-1) this will be fixed
// later in this method.
bucket.updateDocCountError(-thisAggDocCountError);
if (getShowDocCountError()) {
for (B bucket : terms.getBuckets()) {
// If there is already a doc count error for this bucket
// subtract this aggs doc count error from it to make the
// new value for the bucket. This then means that when the
// final error for the bucket is calculated below we account
// for the existing error calculated in a previous reduce.
// Note that if the error is unbounded (-1) this will be fixed
// later in this method.
bucket.updateDocCountError(-thisAggDocCountError);
}
}
if (terms.getBuckets().isEmpty() == false) {
bucketsList.add(terms.getBuckets());
Expand Down Expand Up @@ -319,17 +321,17 @@ public InternalAggregation get() {
result.add(bucket.reduced(AbstractInternalTerms.this::reduceBucket, reduceContext));
});
}
for (B r : result) {
if (sumDocCountError == -1) {
r.setDocCountError(-1);
} else {
r.updateDocCountError(sumDocCountError);
if (getShowDocCountError()) {
for (B r : result) {
if (sumDocCountError == -1) {
r.setDocCountError(-1);
} else {
r.updateDocCountError(sumDocCountError);
}
}
}
long docCountError;
if (sumDocCountError == -1) {
docCountError = -1;
} else {
long docCountError = -1;
if (sumDocCountError != -1) {
docCountError = size == 1 ? 0 : sumDocCountError;
}
return create(name, result, reduceContext.isFinalReduce() ? getOrder() : thisReduceOrder, docCountError, otherDocCount[0]);
Expand All @@ -349,7 +351,7 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
b -> createBucket(
samplingContext.scaleUp(b.getDocCount()),
InternalAggregations.finalizeSampling(b.getAggregations(), samplingContext),
b.getShowDocCountError() ? samplingContext.scaleUp(b.getDocCountError()) : 0,
getShowDocCountError() ? samplingContext.scaleUp(b.getDocCountError()) : 0,
b
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
prototype.term,
prototype.docCount,
aggregations,
prototype.showDocCountError,
prototype.docCountError,
showTermDocCountError,
prototype.getDocCountError(),
prototype.format
);
}
Expand Down Expand Up @@ -216,6 +216,6 @@ public void close() {

@Override
protected Bucket createBucket(long docCount, InternalAggregations aggs, long docCountError, DoubleTerms.Bucket prototype) {
return new Bucket(prototype.term, docCount, aggs, prototype.showDocCountError, docCountError, format);
return new Bucket(prototype.term, docCount, aggs, showTermDocCountError, docCountError, format);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,6 @@ StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp, GlobalOrdLookup
BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format);
result.bucketOrd = temp.bucketOrd;
result.docCountError = 0;
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,22 @@ protected final void writeTermTypeInfoTo(StreamOutput out) throws IOException {
writeSize(shardSize, out);
out.writeBoolean(showTermDocCountError);
out.writeVLong(otherDocCount);
out.writeCollection(buckets);
out.writeVInt(buckets.size());
for (var bucket : buckets) {
bucket.writeTo(out, showTermDocCountError);
}
}

@Override
protected void setDocCountError(long docCountError) {
this.docCountError = docCountError;
}

@Override
protected boolean getShowDocCountError() {
return showTermDocCountError;
}

@Override
protected int getShardSize() {
return shardSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.BucketOrder;
Expand All @@ -29,10 +30,11 @@ public abstract class InternalRareTerms<A extends InternalRareTerms<A, B>, B ext
implements
RareTerms {

public abstract static class Bucket<B extends Bucket<B>> extends InternalMultiBucketAggregation.InternalBucket
public abstract static class Bucket<B extends Bucket<B>> extends InternalMultiBucketAggregation.InternalBucketWritable
implements
RareTerms.Bucket,
KeyComparable<B> {
KeyComparable<B>,
Writeable {
/**
* Reads a bucket. Should be a constructor reference.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
public static final String BG_COUNT = "bg_count";

@SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
public abstract static class Bucket<B extends Bucket<B>> extends InternalMultiBucketAggregation.InternalBucket
public abstract static class Bucket<B extends Bucket<B>> extends InternalMultiBucketAggregation.InternalBucketWritable
implements
SignificantTerms.Bucket {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ public interface Reader<B extends Bucket<B>> {
long bucketOrd;

protected long docCount;
protected long docCountError;
private long docCountError;
protected InternalAggregations aggregations;
protected final boolean showDocCountError;
protected final DocValueFormat format;

protected Bucket(
Expand All @@ -53,29 +52,23 @@ protected Bucket(
long docCountError,
DocValueFormat formatter
) {
this.showDocCountError = showDocCountError;
this.format = formatter;
this.docCount = docCount;
this.aggregations = aggregations;
this.docCountError = docCountError;
this.docCountError = showDocCountError ? docCountError : -1;
}

/**
* Read from a stream.
*/
protected Bucket(StreamInput in, DocValueFormat formatter, boolean showDocCountError) throws IOException {
this.showDocCountError = showDocCountError;
this.format = formatter;
docCount = in.readVLong();
docCountError = -1;
if (showDocCountError) {
docCountError = in.readLong();
}
docCountError = showDocCountError ? in.readLong() : -1;
aggregations = InternalAggregations.readFrom(in);
}

@Override
public final void writeTo(StreamOutput out) throws IOException {
final void writeTo(StreamOutput out, boolean showDocCountError) throws IOException {
out.writeVLong(getDocCount());
if (showDocCountError) {
out.writeLong(docCountError);
Expand Down Expand Up @@ -105,9 +98,6 @@ public void setBucketOrd(long bucketOrd) {

@Override
public long getDocCountError() {
if (showDocCountError == false) {
throw new IllegalStateException("show_terms_doc_count_error is false");
}
return docCountError;
}

Expand All @@ -121,11 +111,6 @@ protected void updateDocCountError(long docCountErrorDiff) {
this.docCountError += docCountErrorDiff;
}

@Override
protected boolean getShowDocCountError() {
return showDocCountError;
}

@Override
public InternalAggregations getAggregations() {
return aggregations;
Expand Down Expand Up @@ -155,23 +140,15 @@ public boolean equals(Object obj) {
return false;
}
Bucket<?> that = (Bucket<?>) obj;
if (showDocCountError && docCountError != that.docCountError) {
/*
* docCountError doesn't matter if not showing it and
* serialization sets it to -1 no matter what it was
* before.
*/
return false;
}
return Objects.equals(docCount, that.docCount)
&& Objects.equals(showDocCountError, that.showDocCountError)
return Objects.equals(docCountError, that.docCountError)
&& Objects.equals(docCount, that.docCount)
&& Objects.equals(format, that.format)
&& Objects.equals(aggregations, that.aggregations);
}

@Override
public int hashCode() {
return Objects.hash(getClass(), docCount, format, showDocCountError, showDocCountError ? docCountError : -1, aggregations);
return Objects.hash(getClass(), docCount, format, docCountError, aggregations);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
prototype.term,
prototype.docCount,
aggregations,
prototype.showDocCountError,
prototype.docCountError,
showTermDocCountError,
prototype.getDocCountError(),
prototype.format
);
}
Expand Down Expand Up @@ -260,7 +260,7 @@ public InternalAggregation get() {

@Override
protected Bucket createBucket(long docCount, InternalAggregations aggs, long docCountError, LongTerms.Bucket prototype) {
return new Bucket(prototype.term, docCount, aggs, prototype.showDocCountError, docCountError, format);
return new Bucket(prototype.term, docCount, aggs, showTermDocCountError, docCountError, format);
}

/**
Expand Down
Loading

0 comments on commit 46c9a75

Please sign in to comment.