diff --git a/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java b/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java index 0f05afd5a..f0dd15513 100644 --- a/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java +++ b/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlockUtil.java @@ -21,6 +21,8 @@ import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.common.block.column.TimeColumn; +import org.apache.tsfile.read.filter.basic.Filter; +import org.apache.tsfile.read.reader.series.PaginationController; public class TsBlockUtil { @@ -65,4 +67,57 @@ public static int getFirstConditionIndex( } return left; } + + public static TsBlock applyFilterAndLimitOffsetToTsBlock( + TsBlock unFilteredBlock, + TsBlockBuilder builder, + Filter pushDownFilter, + PaginationController paginationController) { + boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock); + + // construct time column + int readEndIndex = + buildTimeColumnWithPagination( + unFilteredBlock, builder, keepCurrentRow, paginationController); + + // construct value columns + for (int i = 0; i < builder.getValueColumnBuilders().length; i++) { + for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) { + if (keepCurrentRow[rowIndex]) { + if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) { + builder.getColumnBuilder(i).appendNull(); + } else { + builder + .getColumnBuilder(i) + .writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex)); + } + } + } + } + return builder.build(); + } + + private static int buildTimeColumnWithPagination( + TsBlock unFilteredBlock, + TsBlockBuilder builder, + boolean[] keepCurrentRow, + PaginationController paginationController) { + int readEndIndex = unFilteredBlock.getPositionCount(); + for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) { + if (keepCurrentRow[rowIndex]) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + keepCurrentRow[rowIndex] = false; + } else if (paginationController.hasCurLimit()) { + builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex)); + builder.declarePosition(); + paginationController.consumeLimit(); + } else { + readEndIndex = rowIndex; + break; + } + } + } + return readEndIndex; + } } diff --git a/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java index b60a0b8d3..383f41954 100644 --- a/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java +++ b/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java @@ -28,6 +28,7 @@ import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.TsBlockUtil; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.reader.IPageReader; import org.apache.tsfile.read.reader.IPointReader; @@ -205,11 +206,14 @@ public TsBlock getAllSatisfiedData() throws IOException { // construct value columns buildValueColumns(readEndIndex, keepCurrentRow, isDeleted); + TsBlock unFilteredBlock = builder.build(); if (pushDownFilterAllSatisfy) { // OFFSET & LIMIT has been consumed in buildTimeColumn - return builder.build(); + return unFilteredBlock; } - return applyPushDownFilter(); + builder.reset(); + return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock( + unFilteredBlock, builder, pushDownFilter, paginationController); } private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) { @@ -279,26 +283,6 @@ private int buildTimeColumnWithPagination(long[] timeBatch, boolean[] keepCurren return readEndIndex; } - private int buildTimeColumnWithPagination(TsBlock unFilteredBlock, boolean[] keepCurrentRow) { - int readEndIndex = unFilteredBlock.getPositionCount(); - for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) { - if (keepCurrentRow[rowIndex]) { - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - keepCurrentRow[rowIndex] = false; - } else if (paginationController.hasCurLimit()) { - builder.getTimeColumnBuilder().writeLong(unFilteredBlock.getTimeByIndex(rowIndex)); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - readEndIndex = rowIndex; - break; - } - } - } - return readEndIndex; - } - private int buildTimeColumnWithoutPagination(long[] timeBatch, boolean[] keepCurrentRow) { int readEndIndex = 0; for (int i = 0; i < timeBatch.length; i++) { @@ -386,32 +370,6 @@ private void updateKeepCurrentRowThroughBitmask(boolean[] keepCurrentRow, byte[] } } - private TsBlock applyPushDownFilter() { - TsBlock unFilteredBlock = builder.build(); - builder.reset(); - - boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(unFilteredBlock); - - // construct time column - int readEndIndex = buildTimeColumnWithPagination(unFilteredBlock, keepCurrentRow); - - // construct value columns - for (int i = 0; i < valueCount; i++) { - for (int rowIndex = 0; rowIndex < readEndIndex; rowIndex++) { - if (keepCurrentRow[rowIndex]) { - if (unFilteredBlock.getValueColumns()[i].isNull(rowIndex)) { - builder.getColumnBuilder(i).appendNull(); - } else { - builder - .getColumnBuilder(i) - .writeObject(unFilteredBlock.getValueColumns()[i].getObject(rowIndex)); - } - } - } - } - return builder.build(); - } - public void setDeleteIntervalList(List> list) { for (int i = 0; i < valueCount; i++) { if (valuePageReaderList.get(i) != null) {