From 7b09a5adcaeb5ae4b96a9a9cbf3508829997b6af Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Fri, 12 Jan 2024 22:51:31 +0800 Subject: [PATCH] Pipe: fix NPE when parsing TabletInsertionData & add IT for null values insertion (#11877) Co-authored-by: Steve Yurong Su --- .../iotdb/pipe/it/IoTDBPipeDataSinkIT.java | 46 --- .../iotdb/pipe/it/IoTDBPipeNullValueIT.java | 320 ++++++++++++++++++ .../tablet/TabletInsertionDataContainer.java | 152 ++++++--- .../event/PipeTabletInsertionEventTest.java | 93 +++-- 4 files changed, 490 insertions(+), 121 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java index 65eb2a20f28d..043f24cf3efd 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java @@ -33,7 +33,6 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -135,49 +134,4 @@ public void testLegacyConnector() throws Exception { Collections.singleton("0,1.0,")); } } - - @Test - public void testInsertNull() throws Exception { - DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); - - String receiverIp = receiverDataNode.getIp(); - int receiverPort = receiverDataNode.getPort(); - - try (SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - Map extractorAttributes = new HashMap<>(); - Map processorAttributes = new HashMap<>(); - Map connectorAttributes = new HashMap<>(); - - connectorAttributes.put("connector", "iotdb-thrift-connector"); - connectorAttributes.put("connector.ip", receiverIp); - connectorAttributes.put("connector.port", Integer.toString(receiverPort)); - - TSStatus status = - client.createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) - .setExtractorAttributes(extractorAttributes) - .setProcessorAttributes(processorAttributes)); - - Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); - - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - - if (!TestUtils.tryExecuteNonQueriesWithRetry( - receiverEnv, - Arrays.asList( - "create aligned timeseries root.sg.d1(s0 float, s1 float)", - "create aligned timeseries root.sg.d1(s0 float, s1 float)", - "insert into root.sg.d1(time, s0, s1) values (3, null, 25.34)"))) { - return; - } - - TestUtils.assertDataOnEnv( - receiverEnv, - "select * from root.**", - "Time,root.sg.d1.s0,root.sg.d1.s1,", - Collections.singleton("3,null,25.34,")); - } - } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java new file mode 100644 index 000000000000..46d649690ac8 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeNullValueIT.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2; +import org.apache.iotdb.pipe.PipeEnvironmentException; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.BitMap; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2.class}) +public class IoTDBPipeNullValueIT extends AbstractPipeDualIT { + + // Test dimensions: + // 1. is or not aligned + // 2. is or not parsed + // 3. session insertRecord, session insertTablet, SQL insert + // 4. partial null, all null + // 5. one row or more (TODO) + // 6. more data types (TODO) + + private enum InsertType { + SESSION_INSERT_RECORD, + SESSION_INSERT_TABLET, + SQL_INSERT, + } + + private static final Map> INSERT_NULL_VALUE_MAP = new HashMap<>(); + + private static final List CREATE_TIMESERIES_SQL = + Arrays.asList( + "create timeseries root.sg.d1.s0 with datatype=float", + "create timeseries root.sg.d1.s1 with datatype=float"); + + private static final List CREATE_ALIGNED_TIMESERIES_SQL = + Collections.singletonList("create aligned timeseries root.sg.d1(s0 float, s1 float)"); + + private final String deviceId = "root.sg.d1"; + private final List measurements = Arrays.asList("s0", "s1"); + private final List types = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT); + + private final List partialNullValues = Arrays.asList(null, 25.34F); + private final List allNullValues = Arrays.asList(null, null); + + private Tablet partialNullTablet; + private Tablet allNullTablet; + + private void constructTablet() { + final MeasurementSchema[] schemas = new MeasurementSchema[2]; + for (int i = 0; i < schemas.length; i++) { + schemas[i] = new MeasurementSchema(measurements.get(i), types.get(i)); + } + + final BitMap[] bitMapsForPartialNull = new BitMap[2]; + bitMapsForPartialNull[0] = new BitMap(1); + bitMapsForPartialNull[0].markAll(); + bitMapsForPartialNull[1] = new BitMap(1); + + final BitMap[] bitMapsForAllNull = new BitMap[2]; + bitMapsForAllNull[0] = new BitMap(1); + bitMapsForAllNull[0].markAll(); + bitMapsForAllNull[1] = new BitMap(1); + bitMapsForAllNull[1].markAll(); + + final Object[] valuesForPartialNull = new Object[2]; + valuesForPartialNull[0] = new float[] {0F}; + valuesForPartialNull[1] = new float[] {25.34F}; + + final Object[] valuesForAllNull = new Object[2]; + valuesForAllNull[0] = new float[] {0F}; + valuesForAllNull[1] = new float[] {0F}; + + partialNullTablet = new Tablet(deviceId, Arrays.asList(schemas), 1); + partialNullTablet.values = valuesForPartialNull; + partialNullTablet.timestamps = new long[] {3}; + partialNullTablet.rowSize = 1; + partialNullTablet.bitMaps = bitMapsForPartialNull; + + allNullTablet = new Tablet(deviceId, Arrays.asList(schemas), 1); + allNullTablet.values = valuesForAllNull; + allNullTablet.timestamps = new long[] {4}; + allNullTablet.rowSize = 1; + allNullTablet.bitMaps = bitMapsForAllNull; + } + + @Override + @Before + public void setUp() throws PipeEnvironmentException { + super.setUp(); + + constructTablet(); + + // init INSERT_NULL_VALUE_MAP + INSERT_NULL_VALUE_MAP.put( + InsertType.SESSION_INSERT_RECORD, + (isAligned) -> { + try { + try (ISession session = senderEnv.getSessionConnection()) { + if (isAligned) { + session.insertAlignedRecord(deviceId, 3, measurements, types, partialNullValues); + session.insertAlignedRecord(deviceId, 4, measurements, types, allNullValues); + } else { + session.insertRecord(deviceId, 3, measurements, types, partialNullValues); + session.insertRecord(deviceId, 4, measurements, types, allNullValues); + } + } catch (StatementExecutionException e) { + fail(e.getMessage()); + } + } catch (IoTDBConnectionException e) { + fail(e.getMessage()); + } + }); + + INSERT_NULL_VALUE_MAP.put( + InsertType.SESSION_INSERT_TABLET, + (isAligned) -> { + try { + try (ISession session = senderEnv.getSessionConnection()) { + if (isAligned) { + session.insertAlignedTablet(partialNullTablet); + session.insertAlignedTablet(allNullTablet); + } else { + session.insertTablet(partialNullTablet); + session.insertTablet(allNullTablet); + } + } catch (StatementExecutionException e) { + fail(e.getMessage()); + } + } catch (IoTDBConnectionException e) { + fail(e.getMessage()); + } + }); + + INSERT_NULL_VALUE_MAP.put( + InsertType.SQL_INSERT, + (isAligned) -> { + // partial null + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + isAligned + ? Collections.singletonList( + "insert into root.sg.d1(time, s0, s1) aligned values (3, null, 25.34)") + : Collections.singletonList( + "insert into root.sg.d1(time, s0, s1) values (3, null, 25.34)"))) { + fail(); + } + // all null + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + isAligned + ? Collections.singletonList( + "insert into root.sg.d1(time, s0, s1) aligned values (4, null, null)") + : Collections.singletonList( + "insert into root.sg.d1(time, s0, s1) values (4, null, null)"))) { + fail(); + } + }); + } + + private void testInsertNullValueTemplate( + InsertType insertType, boolean isAligned, boolean withParsing) throws Exception { + DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + String receiverIp = receiverDataNode.getIp(); + int receiverPort = receiverDataNode.getPort(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + Map extractorAttributes = new HashMap<>(); + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + if (withParsing) { + extractorAttributes.put("extractor.pattern", "root.sg.d1"); + } + + TSStatus status = + client.createPipe( + new TCreatePipeReq("test", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + receiverEnv, isAligned ? CREATE_ALIGNED_TIMESERIES_SQL : CREATE_TIMESERIES_SQL)) { + fail(); + } + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, isAligned ? CREATE_ALIGNED_TIMESERIES_SQL : CREATE_TIMESERIES_SQL)) { + fail(); + } + + INSERT_NULL_VALUE_MAP.get(insertType).accept(isAligned); + + TestUtils.assertDataOnEnv( + receiverEnv, + "select count(*) from root.**", + "count(root.sg.d1.s0),count(root.sg.d1.s1),", + Collections.singleton("0,1,")); + } + + // ---------------------- // + // Scenario 1: SQL Insert // + // ---------------------- // + @Test + public void testSQLInsertWithParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SQL_INSERT, false, true); + } + + @Test + public void testSQLInsertWithoutParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SQL_INSERT, false, false); + } + + @Test + public void testSQLInsertAlignedWithParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SQL_INSERT, true, true); + } + + @Test + public void testSQLInsertAlignedWithoutParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SQL_INSERT, true, false); + } + + // --------------------------------- // + // Scenario 2: Session Insert Record // + // --------------------------------- // + @Test + public void testSessionInsertRecordWithParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, false, true); + } + + @Test + public void testSessionInsertRecordWithoutParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, false, false); + } + + @Test + public void testSessionInsertRecordAlignedWithParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, true, true); + } + + @Test + public void testSessionInsertRecordAlignedWithoutParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SESSION_INSERT_RECORD, true, false); + } + + // --------------------------------- // + // Scenario 3: Session Insert Tablet // + // --------------------------------- // + @Test + public void testSessionInsertTabletWithParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, false, true); + } + + @Test + public void testSessionInsertTabletWithoutParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, false, false); + } + + @Test + public void testSessionInsertTabletAlignedWithParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, true, true); + } + + @Test + public void testSessionInsertTabletAlignedWithoutParsing() throws Exception { + testInsertNullValueTemplate(InsertType.SESSION_INSERT_TABLET, true, false); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index 222e135335aa..6fb80a887886 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -38,6 +38,7 @@ import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,22 +160,33 @@ private void parse(InsertRowNode insertRowNode, String pattern) { this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; - this.valueColumns[filteredColumnIndex] = - filterValueColumnsByRowIndexList( - originValueColumnTypes[i], originValueColumns[i], rowIndexList, true); - this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1); + BitMap bitMap = new BitMap(this.timestampColumn.length); + if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) { + this.valueColumns[filteredColumnIndex] = null; + bitMap.markAll(); + } else { + this.valueColumns[filteredColumnIndex] = + filterValueColumnsByRowIndexList( + originValueColumnTypes[i], + originValueColumns[i], + rowIndexList, + true, + bitMap, // use the output bitmap since there is no bitmap in InsertRowNode + bitMap); + } + this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap; } } - rowCount = rowIndexList.size(); - if (rowCount == 0 && LOGGER.isDebugEnabled()) { + this.rowCount = this.timestampColumn.length; + if (this.rowCount == 0 && LOGGER.isDebugEnabled()) { LOGGER.debug( "InsertRowNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.", insertRowNode, pattern, - sourceEvent.getStartTime(), - sourceEvent.getEndTime(), - sourceEvent); + this.sourceEvent.getStartTime(), + this.sourceEvent.getEndTime(), + this.sourceEvent); } } @@ -186,6 +198,7 @@ private void parse(InsertTabletNode insertTabletNode, String pattern) { this.isAligned = insertTabletNode.isAligned(); final long[] originTimestampColumn = insertTabletNode.getTimes(); + final int originRowSize = originTimestampColumn.length; List rowIndexList = generateRowIndexList(originTimestampColumn); this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); @@ -215,12 +228,12 @@ private void parse(InsertTabletNode insertTabletNode, String pattern) { (insertTabletNode.getBitMaps() == null ? IntStream.range(0, originColumnSize) .boxed() - .map(o -> new BitMap(timestampColumn.length)) + .map(o -> new BitMap(originRowSize)) .toArray(BitMap[]::new) : insertTabletNode.getBitMaps()); for (int i = 0; i < originBitMapList.length; i++) { if (originBitMapList[i] == null) { - originBitMapList[i] = new BitMap(timestampColumn.length); + originBitMapList[i] = new BitMap(originRowSize); } } @@ -230,14 +243,25 @@ private void parse(InsertTabletNode insertTabletNode, String pattern) { this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; - this.valueColumns[filteredColumnIndex] = - filterValueColumnsByRowIndexList( - originValueColumnTypes[i], originValueColumns[i], rowIndexList, false); - this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i]; + BitMap bitMap = new BitMap(this.timestampColumn.length); + if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) { + this.valueColumns[filteredColumnIndex] = null; + bitMap.markAll(); + } else { + this.valueColumns[filteredColumnIndex] = + filterValueColumnsByRowIndexList( + originValueColumnTypes[i], + originValueColumns[i], + rowIndexList, + false, + originBitMapList[i], + bitMap); + } + this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap; } } - rowCount = timestampColumn.length; + this.rowCount = this.timestampColumn.length; if (rowCount == 0 && LOGGER.isDebugEnabled()) { LOGGER.debug( "InsertTabletNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.", @@ -256,7 +280,9 @@ private void parse(Tablet tablet, boolean isAligned, String pattern) { this.deviceId = tablet.deviceId; this.isAligned = isAligned; - final long[] originTimestampColumn = tablet.timestamps; + final long[] originTimestampColumn = + Arrays.copyOf( + tablet.timestamps, tablet.rowSize); // tablet.timestamps.length == tablet.maxRowNumber List rowIndexList = generateRowIndexList(originTimestampColumn); this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); @@ -287,17 +313,18 @@ private void parse(Tablet tablet, boolean isAligned, String pattern) { originColumnNameStringList[i] = originMeasurementSchemaList.get(i).getMeasurementId(); originValueColumnTypes[i] = originMeasurementSchemaList.get(i).getType(); } - final Object[] originValueColumns = tablet.values; + final Object[] originValueColumns = + tablet.values; // we do not reduce value columns here by origin row size final BitMap[] originBitMapList = tablet.bitMaps == null ? IntStream.range(0, originColumnSize) .boxed() - .map(o -> new BitMap(timestampColumn.length)) + .map(o -> new BitMap(tablet.getMaxRowNumber())) .toArray(BitMap[]::new) - : tablet.bitMaps; + : tablet.bitMaps; // we do not reduce bitmaps here by origin row size for (int i = 0; i < originBitMapList.length; i++) { if (originBitMapList[i] == null) { - originBitMapList[i] = new BitMap(timestampColumn.length); + originBitMapList[i] = new BitMap(tablet.getMaxRowNumber()); } } @@ -307,22 +334,33 @@ private void parse(Tablet tablet, boolean isAligned, String pattern) { this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList.get(i); this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; - this.valueColumns[filteredColumnIndex] = - filterValueColumnsByRowIndexList( - originValueColumnTypes[i], originValueColumns[i], rowIndexList, false); - this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i]; + BitMap bitMap = new BitMap(this.timestampColumn.length); + if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnTypes[i])) { + this.valueColumns[filteredColumnIndex] = null; + bitMap.markAll(); + } else { + this.valueColumns[filteredColumnIndex] = + filterValueColumnsByRowIndexList( + originValueColumnTypes[i], + originValueColumns[i], + rowIndexList, + false, + originBitMapList[i], + bitMap); + } + this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap; } } - rowCount = tablet.rowSize; - if (rowCount == 0 && LOGGER.isDebugEnabled()) { + this.rowCount = this.timestampColumn.length; + if (this.rowCount == 0 && LOGGER.isDebugEnabled()) { LOGGER.debug( "Tablet({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.", tablet, pattern, - sourceEvent.getStartTime(), - sourceEvent.getEndTime(), - sourceEvent); + this.sourceEvent.getStartTime(), + this.sourceEvent.getEndTime(), + this.sourceEvent); } } @@ -394,10 +432,12 @@ private static List generateFullRowIndexList(int rowCount) { } private static Object filterValueColumnsByRowIndexList( - TSDataType type, - Object originValueColumn, - List rowIndexList, - boolean isSingleOriginValueColumn) { + @NonNull TSDataType type, + @NonNull Object originValueColumn, + @NonNull List rowIndexList, + boolean isSingleOriginValueColumn, + @NonNull BitMap originNullValueColumnBitmap, + @NonNull BitMap nullValueColumnBitmap /* output parameters */) { switch (type) { case INT32: { @@ -407,7 +447,12 @@ private static Object filterValueColumnsByRowIndexList( : (int[]) originValueColumn; int[] valueColumns = new int[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - valueColumns[i] = intValueColumns[rowIndexList.get(i)]; + if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + valueColumns[i] = 0; + nullValueColumnBitmap.mark(i); + } else { + valueColumns[i] = intValueColumns[rowIndexList.get(i)]; + } } return valueColumns; } @@ -419,7 +464,12 @@ private static Object filterValueColumnsByRowIndexList( : (long[]) originValueColumn; long[] valueColumns = new long[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - valueColumns[i] = longValueColumns[rowIndexList.get(i)]; + if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + valueColumns[i] = 0L; + nullValueColumnBitmap.mark(i); + } else { + valueColumns[i] = longValueColumns[rowIndexList.get(i)]; + } } return valueColumns; } @@ -431,7 +481,12 @@ private static Object filterValueColumnsByRowIndexList( : (float[]) originValueColumn; float[] valueColumns = new float[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - valueColumns[i] = floatValueColumns[rowIndexList.get(i)]; + if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + valueColumns[i] = 0F; + nullValueColumnBitmap.mark(i); + } else { + valueColumns[i] = floatValueColumns[rowIndexList.get(i)]; + } } return valueColumns; } @@ -443,7 +498,12 @@ private static Object filterValueColumnsByRowIndexList( : (double[]) originValueColumn; double[] valueColumns = new double[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - valueColumns[i] = doubleValueColumns[rowIndexList.get(i)]; + if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + valueColumns[i] = 0D; + nullValueColumnBitmap.mark(i); + } else { + valueColumns[i] = doubleValueColumns[rowIndexList.get(i)]; + } } return valueColumns; } @@ -455,7 +515,12 @@ private static Object filterValueColumnsByRowIndexList( : (boolean[]) originValueColumn; boolean[] valueColumns = new boolean[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - valueColumns[i] = booleanValueColumns[rowIndexList.get(i)]; + if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + valueColumns[i] = false; + nullValueColumnBitmap.mark(i); + } else { + valueColumns[i] = booleanValueColumns[rowIndexList.get(i)]; + } } return valueColumns; } @@ -467,7 +532,14 @@ private static Object filterValueColumnsByRowIndexList( : (Binary[]) originValueColumn; Binary[] valueColumns = new Binary[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - valueColumns[i] = new Binary(binaryValueColumns[rowIndexList.get(i)].getValues()); + if (Objects.isNull(binaryValueColumns[rowIndexList.get(i)]) + || Objects.isNull(binaryValueColumns[rowIndexList.get(i)].getValues()) + || originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + valueColumns[i] = Binary.EMPTY_VALUE; + nullValueColumnBitmap.mark(i); + } else { + valueColumns[i] = new Binary(binaryValueColumns[rowIndexList.get(i)].getValues()); + } } return valueColumns; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index 46543ceabf07..011d26f2ff08 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -75,18 +75,18 @@ public void setUp() throws Exception { } private void createMeasurementSchema() { - for (int i = 0; i < 6; i++) { + for (int i = 0; i < schemas.length; i++) { schemas[i] = new MeasurementSchema(measurementIds[i], dataTypes[i]); } } private void createInsertRowNode() throws IllegalPathException { - final Object[] values = new Object[6]; + final Object[] values = new Object[schemas.length]; values[0] = 100; values[1] = 10000L; values[2] = 2F; - values[3] = 1.0; + values[3] = 1D; values[4] = false; values[5] = BytesUtils.valueOf("text"); @@ -116,20 +116,20 @@ private void createInsertRowNode() throws IllegalPathException { } private void createInsertTabletNode() throws IllegalPathException { - final Object[] values = new Object[6]; + final Object[] values = new Object[schemas.length]; - values[0] = new int[5]; - values[1] = new long[5]; - values[2] = new float[5]; - values[3] = new double[5]; - values[4] = new boolean[5]; - values[5] = new Binary[5]; + values[0] = new int[times.length]; + values[1] = new long[times.length]; + values[2] = new float[times.length]; + values[3] = new double[times.length]; + values[4] = new boolean[times.length]; + values[5] = new Binary[times.length]; - for (int r = 0; r < 5; r++) { + for (int r = 0; r < times.length; r++) { ((int[]) values[0])[r] = 100; - ((long[]) values[1])[r] = 10000; - ((float[]) values[2])[r] = 2; - ((double[]) values[3])[r] = 1.0; + ((long[]) values[1])[r] = 10000L; + ((float[]) values[2])[r] = 2F; + ((double[]) values[3])[r] = 1D; ((boolean[]) values[4])[r] = false; ((Binary[]) values[5])[r] = BytesUtils.valueOf("text"); } @@ -162,11 +162,11 @@ private void createInsertTabletNode() throws IllegalPathException { } private void createTablet() { - final Object[] values = new Object[6]; + final Object[] values = new Object[schemas.length]; // create tablet for insertRowNode - BitMap[] bitMapsForInsertRowNode = new BitMap[6]; - for (int i = 0; i < 6; i++) { + BitMap[] bitMapsForInsertRowNode = new BitMap[schemas.length]; + for (int i = 0; i < schemas.length; i++) { bitMapsForInsertRowNode[i] = new BitMap(1); } @@ -179,9 +179,9 @@ private void createTablet() { for (int r = 0; r < 1; r++) { ((int[]) values[0])[r] = 100; - ((long[]) values[1])[r] = 10000; - ((float[]) values[2])[r] = 2; - ((double[]) values[3])[r] = 1.0; + ((long[]) values[1])[r] = 10000L; + ((float[]) values[2])[r] = 2F; + ((double[]) values[3])[r] = 1D; ((boolean[]) values[4])[r] = false; ((Binary[]) values[5])[r] = BytesUtils.valueOf("text"); } @@ -193,26 +193,27 @@ private void createTablet() { tabletForInsertRowNode.bitMaps = bitMapsForInsertRowNode; // create tablet for insertTabletNode - BitMap[] bitMapsForInsertTabletNode = new BitMap[6]; - for (int i = 0; i < 6; i++) { + BitMap[] bitMapsForInsertTabletNode = new BitMap[schemas.length]; + for (int i = 0; i < schemas.length; i++) { bitMapsForInsertTabletNode[i] = new BitMap(times.length); } - values[0] = new int[5]; - values[1] = new long[5]; - values[2] = new float[5]; - values[3] = new double[5]; - values[4] = new boolean[5]; - values[5] = new Binary[5]; + values[0] = new int[times.length]; + values[1] = new long[times.length]; + values[2] = new float[times.length]; + values[3] = new double[times.length]; + values[4] = new boolean[times.length]; + values[5] = new Binary[times.length]; - for (int r = 0; r < 5; r++) { + for (int r = 0; r < times.length; r++) { ((int[]) values[0])[r] = 100; - ((long[]) values[1])[r] = 10000; - ((float[]) values[2])[r] = 2; - ((double[]) values[3])[r] = 1.0; + ((long[]) values[1])[r] = 10000L; + ((float[]) values[2])[r] = 2F; + ((double[]) values[3])[r] = 1D; ((boolean[]) values[4])[r] = false; ((Binary[]) values[5])[r] = BytesUtils.valueOf("text"); } + tabletForInsertTabletNode = new Tablet(deviceId, Arrays.asList(schemas), times.length); tabletForInsertTabletNode.values = values; tabletForInsertTabletNode.timestamps = times; @@ -294,13 +295,35 @@ public void convertToTabletWithFilteredRowsForTest() { TabletInsertionDataContainer container2 = new TabletInsertionDataContainer( null, - new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L, 113L), - insertTabletNode, + new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 110L, 110L), + insertRowNode, pattern); Tablet tablet2 = container2.convertToTablet(); - Assert.assertEquals(3, tablet2.rowSize); + Assert.assertEquals(1, tablet2.rowSize); boolean isAligned2 = container2.isAligned(); Assert.assertFalse(isAligned2); + + TabletInsertionDataContainer container3 = + new TabletInsertionDataContainer( + null, + new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L, 113L), + insertTabletNode, + pattern); + Tablet tablet3 = container3.convertToTablet(); + Assert.assertEquals(3, tablet3.rowSize); + boolean isAligned3 = container3.isAligned(); + Assert.assertFalse(isAligned3); + + TabletInsertionDataContainer container4 = + new TabletInsertionDataContainer( + null, + new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, Long.MIN_VALUE, 109L), + insertTabletNode, + pattern); + Tablet tablet4 = container4.convertToTablet(); + Assert.assertEquals(0, tablet4.rowSize); + boolean isAligned4 = container4.isAligned(); + Assert.assertFalse(isAligned4); } @Test