Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES|QL categorize with multiple groupings #118173

Merged
merged 25 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
41279c7
ES|QL categorize with multiple groupings.
jan-elastic Dec 6, 2024
9f770d5
Fix VerifierTests
jan-elastic Dec 7, 2024
d7af8bf
Close stuff when constructing CategorizePackedValuesBlockHash fails
jan-elastic Dec 9, 2024
e6ac068
CategorizePackedValuesBlockHashTests
jan-elastic Dec 9, 2024
35e9811
Improve categorize javadocs
jan-elastic Dec 9, 2024
b44663a
Update docs/changelog/118173.yaml
jan-elastic Dec 9, 2024
82a38f9
Create CategorizePackedValuesBlockHash's deletegate page differently
jan-elastic Dec 9, 2024
5121e9a
Double check in BlockHash builder for single categorize
jan-elastic Dec 11, 2024
3c0325a
Reuse blocks array
jan-elastic Dec 11, 2024
e05f860
More CSV tests
jan-elastic Dec 11, 2024
abfc211
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cat…
jan-elastic Dec 11, 2024
54ac1bf
Remove assumeTrue categorize_v5
jan-elastic Dec 11, 2024
5eb5189
Rename test
jan-elastic Dec 11, 2024
1deb2d4
Two more verifier tests
jan-elastic Dec 11, 2024
a633806
more CSV tests
jan-elastic Dec 11, 2024
e639268
Add JavaDocs/comments
jan-elastic Dec 11, 2024
5a4c8bb
spotless
jan-elastic Dec 11, 2024
5ecc9f9
Refactor/unify recategorize
jan-elastic Dec 11, 2024
16680d2
Better memory accounting
jan-elastic Dec 12, 2024
9588e10
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cat…
jan-elastic Dec 12, 2024
a9c3ed7
fix csv test
jan-elastic Dec 12, 2024
8dd15ac
randomize CategorizePackedValuesBlockHashTests
jan-elastic Dec 12, 2024
7f82b43
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cat…
jan-elastic Dec 12, 2024
0b0be87
Add TODO
jan-elastic Dec 12, 2024
3bccc78
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cat…
jan-elastic Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,15 @@ public static BlockHash buildCategorizeBlockHash(
List<GroupSpec> groups,
AggregatorMode aggregatorMode,
BlockFactory blockFactory,
AnalysisRegistry analysisRegistry
AnalysisRegistry analysisRegistry,
int emitBatchSize
) {
if (groups.size() != 1) {
throw new IllegalArgumentException("only a single CATEGORIZE group can used");
if (groups.size() == 1) {
return new CategorizeBlockHash(blockFactory, groups.get(0).channel, aggregatorMode, analysisRegistry);
} else {
assert groups.get(0).isCategorize();
jan-elastic marked this conversation as resolved.
Show resolved Hide resolved
return new CategorizePackedValuesBlockHash(groups, blockFactory, aggregatorMode, analysisRegistry, emitBatchSize);
}

return new CategorizeBlockHash(blockFactory, groups.get(0).channel, aggregatorMode, analysisRegistry);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import java.util.Objects;

/**
* Base BlockHash implementation for {@code Categorize} grouping function.
* BlockHash implementation for {@code Categorize} grouping function.
*/
public class CategorizeBlockHash extends BlockHash {

Expand Down Expand Up @@ -95,12 +95,14 @@ public class CategorizeBlockHash extends BlockHash {
}
}

boolean seenNull() {
return seenNull;
}

@Override
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
if (aggregatorMode.isInputPartial() == false) {
addInitial(page, addInput);
} else {
addIntermediate(page, addInput);
try (IntBlock block = add(page)) {
addInput.add(0, block);
}
}

Expand Down Expand Up @@ -129,29 +131,28 @@ public void close() {
Releasables.close(evaluator, categorizer);
}

private IntBlock add(Page page) {
return aggregatorMode.isInputPartial() == false ? addInitial(page) : addIntermediate(page);
}

/**
* Adds initial (raw) input to the state.
*/
private void addInitial(Page page, GroupingAggregatorFunction.AddInput addInput) {
try (IntBlock result = (IntBlock) evaluator.eval(page.getBlock(channel))) {
addInput.add(0, result);
}
IntBlock addInitial(Page page) {
return (IntBlock) evaluator.eval(page.getBlock(channel));
}

/**
* Adds intermediate state to the state.
*/
private void addIntermediate(Page page, GroupingAggregatorFunction.AddInput addInput) {
private IntBlock addIntermediate(Page page) {
if (page.getPositionCount() == 0) {
return;
return null;
}
BytesRefBlock categorizerState = page.getBlock(channel);
if (categorizerState.areAllValuesNull()) {
seenNull = true;
try (var newIds = blockFactory.newConstantIntVector(NULL_ORD, 1)) {
addInput.add(0, newIds);
}
return;
return blockFactory.newConstantIntBlockWith(NULL_ORD, 1);
}

Map<Integer, Integer> idMap = readIntermediate(categorizerState.getBytesRef(0, new BytesRef()));
Expand All @@ -161,9 +162,7 @@ private void addIntermediate(Page page, GroupingAggregatorFunction.AddInput addI
for (int i = fromId; i < toId; i++) {
newIdsBuilder.appendInt(idMap.get(i));
}
try (IntBlock newIds = newIdsBuilder.build()) {
addInput.add(0, newIds);
}
return newIdsBuilder.build();
}
}

Expand All @@ -172,7 +171,7 @@ private void addIntermediate(Page page, GroupingAggregatorFunction.AddInput addI
*
* @return a map from the old category id to the new one. The old ids go from 0 to {@code size - 1}.
*/
private Map<Integer, Integer> readIntermediate(BytesRef bytes) {
Map<Integer, Integer> readIntermediate(BytesRef bytes) {
Map<Integer, Integer> idMap = new HashMap<>();
try (StreamInput in = new BytesArray(bytes).streamInput()) {
if (in.readBoolean()) {
Expand All @@ -198,15 +197,19 @@ private Block buildIntermediateBlock() {
if (categorizer.getCategoryCount() == 0) {
return blockFactory.newConstantNullBlock(seenNull ? 1 : 0);
}
int positionCount = categorizer.getCategoryCount() + (seenNull ? 1 : 0);
// We're returning a block with N positions just because the Page must have all blocks with the same position count!
return blockFactory.newConstantBytesRefBlockWith(serializeCategorizer(), positionCount);
}

BytesRef serializeCategorizer() {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeBoolean(seenNull);
out.writeVInt(categorizer.getCategoryCount());
for (SerializableTokenListCategory category : categorizer.toCategoriesById()) {
category.writeTo(out);
}
// We're returning a block with N positions just because the Page must have all blocks with the same position count!
int positionCount = categorizer.getCategoryCount() + (seenNull ? 1 : 0);
return blockFactory.newConstantBytesRefBlockWith(out.bytes().toBytesRef(), positionCount);
return out.bytes().toBytesRef();
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.aggregation.blockhash;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.analysis.AnalysisRegistry;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* BlockHash implementation for {@code Categorize} grouping function as first
* grouping expression, followed by one or mode other grouping expressions.
*/
public class CategorizePackedValuesBlockHash extends BlockHash {

private final AggregatorMode aggregatorMode;
private final CategorizeBlockHash categorizeBlockHash;
private final PackedValuesBlockHash packedValuesBlockHash;

CategorizePackedValuesBlockHash(
List<GroupSpec> specs,
BlockFactory blockFactory,
AggregatorMode aggregatorMode,
AnalysisRegistry analysisRegistry,
int emitBatchSize
) {
super(blockFactory);
this.aggregatorMode = aggregatorMode;

List<GroupSpec> packedValuesBlockHashSpecs = new ArrayList<>(specs);
packedValuesBlockHashSpecs.set(0, new GroupSpec(-1, ElementType.INT));
jan-elastic marked this conversation as resolved.
Show resolved Hide resolved

boolean success = false;
try {
categorizeBlockHash = new CategorizeBlockHash(blockFactory, specs.get(0).channel(), aggregatorMode, analysisRegistry);
packedValuesBlockHash = new PackedValuesBlockHash(packedValuesBlockHashSpecs, blockFactory, emitBatchSize);
success = true;
} finally {
if (success == false) {
close();
}
}
}

@Override
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
try (IntBlock categories = getCategories(page)) {
packedValuesBlockHash.add(page.appendBlock(categories), addInput);
}
}

private IntBlock getCategories(Page page) {
if (aggregatorMode.isInputPartial() == false) {
return categorizeBlockHash.addInitial(page);
} else {
BytesRefBlock stateBlock = page.getBlock(0);
BytesRef stateBytes = stateBlock.getBytesRef(0, new BytesRef());

try (StreamInput in = new BytesArray(stateBytes).streamInput()) {
BytesRef categorizerState = in.readBytesRef();
Map<Integer, Integer> idMap = categorizeBlockHash.readIntermediate(categorizerState);
int[] oldIds = in.readIntArray();
try (IntBlock.Builder newIds = blockFactory.newIntBlockBuilder(page.getPositionCount())) {
for (int oldId : oldIds) {
newIds.appendInt(idMap.get(oldId));
}
return newIds.build();
jan-elastic marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
public Block[] getKeys() {
Block[] keys = packedValuesBlockHash.getKeys();
if (aggregatorMode.isOutputPartial() == false) {
try (
BytesRefBlock regexes = (BytesRefBlock) categorizeBlockHash.getKeys()[0];
BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(keys[0].getPositionCount())
) {
IntVector idsVector = (IntVector) keys[0].asVector();
int idsOffset = categorizeBlockHash.seenNull() ? 0 : -1;
BytesRef scratch = new BytesRef();
for (int i = 0; i < idsVector.getPositionCount(); i++) {
int id = idsVector.getInt(i);
if (id == 0) {
builder.appendNull();
} else {
builder.appendBytesRef(regexes.getBytesRef(id + idsOffset, scratch));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for now: We're repeating, potentially, a lot of bytesref values here. I wonder if there is or it would make sense to have a BytesRefBlock that instead of all the bytesrefs, stores every value just once, and then a reference per position:

AAAAAA
BBBBBBB
AAAAAA
AAAAAA

->

// 1: AAAAAA
// 2: BBBBBBB
1
2
1
1

@nik9000 Something to consider for later? Maybe it's too specific for this. And anyway, the next EVAL or whatever will duplicate the value again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a nice thing to have, but definitely out of scope for this PR.

However, the next EVAL should not duplicate the value again.

If you have:

// 1: AAAAAA
// 2: BBBBBBB
1
2
1
1

then an efficient EVAL x=SUBSTRING(x, 1, 2) should give

// 1: AA
// 2: BB
1
2
1
1

without ever duplicating.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For that SUBSTRING to not duplicate, we would need to add that "hashtable" strategy in the BytesRefBlockBuilder. It looks goo (?), but I wonder if using that by default could perform negatively in some scenarios. Something to try eventually probably

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like worth trying in the future. Are you making a note (issue) of this, so that the idea doesn't get lost?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I'll comment it with Nik, just in case it was considered and discarded already, and then I'll document it in an issue somewhere

jan-elastic marked this conversation as resolved.
Show resolved Hide resolved
}
}
keys[0].close();
keys[0] = builder.build();
}
} else {
BytesRef state;
try (BytesStreamOutput out = new BytesStreamOutput()) {
jan-elastic marked this conversation as resolved.
Show resolved Hide resolved
out.writeBytesRef(categorizeBlockHash.serializeCategorizer());
IntVector idsVector = (IntVector) keys[0].asVector();
int[] idsArray = new int[idsVector.getPositionCount()];
jan-elastic marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < idsVector.getPositionCount(); i++) {
idsArray[i] = idsVector.getInt(i);
}
out.writeIntArray(idsArray);
jan-elastic marked this conversation as resolved.
Show resolved Hide resolved
state = out.bytes().toBytesRef();
} catch (IOException e) {
throw new RuntimeException(e);
}
keys[0].close();
keys[0] = blockFactory.newConstantBytesRefBlockWith(state, keys[0].getPositionCount());
}
return keys;
}

@Override
public IntVector nonEmpty() {
return packedValuesBlockHash.nonEmpty();
}

@Override
public BitArray seenGroupIds(BigArrays bigArrays) {
return packedValuesBlockHash.seenGroupIds(bigArrays);
}

@Override
public final ReleasableIterator<IntBlock> lookup(Page page, ByteSizeValue targetBlockSize) {
throw new UnsupportedOperationException();
}

@Override
public void close() {
Releasables.close(categorizeBlockHash, packedValuesBlockHash);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public <B extends Block> B getBlock(int blockIndex) {
if (blocksReleased) {
throw new IllegalStateException("can't read released page");
}
if (blockIndex < 0) {
blockIndex += blocks.length;
}
jan-elastic marked this conversation as resolved.
Show resolved Hide resolved
@SuppressWarnings("unchecked")
B block = (B) blocks[blockIndex];
if (block.isReleased()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ public Operator get(DriverContext driverContext) {
if (groups.stream().anyMatch(BlockHash.GroupSpec::isCategorize)) {
return new HashAggregationOperator(
aggregators,
() -> BlockHash.buildCategorizeBlockHash(groups, aggregatorMode, driverContext.blockFactory(), analysisRegistry),
() -> BlockHash.buildCategorizeBlockHash(
groups,
aggregatorMode,
driverContext.blockFactory(),
analysisRegistry,
maxPageSize
),
driverContext
);
}
Expand Down
Loading
Loading