Skip to content

Commit

Permalink
add a new util method: applyFilterAndLimitOffsetToTsBlock
Browse files Browse the repository at this point in the history
  • Loading branch information
liuminghui233 authored Feb 23, 2024
1 parent 8fb5dae commit 250e936
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.common.block.column.TimeColumn;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.reader.series.PaginationController;

public class TsBlockUtil {

Expand Down Expand Up @@ -65,4 +67,57 @@ public static int getFirstConditionIndex(
}
return left;
}

public static TsBlock applyFilterAndLimitOffsetToTsBlock(
TsBlock unFilteredBlock,
TsBlockBuilder builder,
Filter pushDownFilter,
PaginationController paginationController) {
boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock);

// construct time column
int readEndIndex =
buildTimeColumnWithPagination(
unFilteredBlock, builder, keepCurrentRow, paginationController);

// construct value columns
for (int i = 0; i < builder.getValueColumnBuilders().length; i++) {
for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
if (keepCurrentRow[rowIndex]) {
if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) {
builder.getColumnBuilder(i).appendNull();
} else {
builder
.getColumnBuilder(i)
.writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex));
}
}
}
}
return builder.build();
}

private static int buildTimeColumnWithPagination(
TsBlock unFilteredBlock,
TsBlockBuilder builder,
boolean[] keepCurrentRow,
PaginationController paginationController) {
int readEndIndex = unFilteredBlock.getPositionCount();
for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
if (keepCurrentRow[rowIndex]) {
if (paginationController.hasCurOffset()) {
paginationController.consumeOffset();
keepCurrentRow[rowIndex] = false;
} else if (paginationController.hasCurLimit()) {
builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex));
builder.declarePosition();
paginationController.consumeLimit();
} else {
readEndIndex = rowIndex;
break;
}
}
}
return readEndIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.TsBlockUtil;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.reader.IPageReader;
import org.apache.tsfile.read.reader.IPointReader;
Expand Down Expand Up @@ -205,11 +206,14 @@ public TsBlock getAllSatisfiedData() throws IOException {
// construct value columns
buildValueColumns(readEndIndex, keepCurrentRow, isDeleted);

TsBlock unFilteredBlock = builder.build();
if (pushDownFilterAllSatisfy) {
// OFFSET & LIMIT has been consumed in buildTimeColumn
return builder.build();
return unFilteredBlock;
}
return applyPushDownFilter();
builder.reset();
return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
unFilteredBlock, builder, pushDownFilter, paginationController);
}

private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) {
Expand Down Expand Up @@ -279,26 +283,6 @@ private int buildTimeColumnWithPagination(long[] timeBatch, boolean[] keepCurren
return readEndIndex;
}

private int buildTimeColumnWithPagination(TsBlock unFilteredBlock, boolean[] keepCurrentRow) {
int readEndIndex = unFilteredBlock.getPositionCount();
for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
if (keepCurrentRow[rowIndex]) {
if (paginationController.hasCurOffset()) {
paginationController.consumeOffset();
keepCurrentRow[rowIndex] = false;
} else if (paginationController.hasCurLimit()) {
builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex));
builder.declarePosition();
paginationController.consumeLimit();
} else {
readEndIndex = rowIndex;
break;
}
}
}
return readEndIndex;
}

private int buildTimeColumnWithoutPagination(long[] timeBatch, boolean[] keepCurrentRow) {
int readEndIndex = 0;
for (int i = 0; i < timeBatch.length; i++) {
Expand Down Expand Up @@ -386,32 +370,6 @@ private void updateKeepCurrentRowThroughBitmask(boolean[] keepCurrentRow, byte[]
}
}

private TsBlock applyPushDownFilter() {
TsBlock unFilteredBlock = builder.build();
builder.reset();

boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock);

// construct time column
int readEndIndex = buildTimeColumnWithPagination(unFilteredBlock, keepCurrentRow);

// construct value columns
for (int i = 0; i < valueCount; i++) {
for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) {
if (keepCurrentRow[rowIndex]) {
if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) {
builder.getColumnBuilder(i).appendNull();
} else {
builder
.getColumnBuilder(i)
.writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex));
}
}
}
}
return builder.build();
}

public void setDeleteIntervalList(List<List<TimeRange>> list) {
for (int i = 0; i < valueCount; i++) {
if (valuePageReaderList.get(i) != null) {
Expand Down

0 comments on commit 250e936

Please sign in to comment.