diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java index bb160a6f2952..2bd290bfc815 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java @@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.ISession; import org.apache.iotdb.it.env.MultiEnvFactory; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; @@ -33,6 +34,9 @@ import org.apache.iotdb.itbase.env.BaseEnv; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -791,11 +795,10 @@ public void testExtractorTimeRangeMatch() throws Exception { .setProcessorAttributes(processorAttributes)); Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + final Map expectedCountResult = new HashMap<>(); + expectedCountResult.put("count(root.db.d1.at1)", "3"); TestUtils.assertDataEventuallyOnEnv( - receiverEnv, - "select count(*) from root.**", - "count(root.db.d1.at1),", - Collections.singleton("3,")); + receiverEnv, "select count(*) from root.db.**", expectedCountResult); // Insert realtime data that overlapped with time range TestUtils.executeNonQueries( @@ -806,11 +809,29 @@ public void testExtractorTimeRangeMatch() throws Exception { "flush"), null); + expectedCountResult.put("count(root.db.d3.at1)", "3"); TestUtils.assertDataEventuallyOnEnv( - receiverEnv, - "select count(*) from root.**", - "count(root.db.d1.at1),count(root.db.d3.at1),", - Collections.singleton("3,3,")); + receiverEnv, "select count(*) from root.db.**", expectedCountResult); + + // Session Tablet can have unused timestamp slots when rowSize is smaller than maxRowNumber. + // The pipe source time range filter should ignore the unused zero tail. + final List schemas = + Collections.singletonList(new MeasurementSchema("at1", TSDataType.INT32)); + final Tablet tabletWithUnusedTail = new Tablet("root.db.d5", schemas, 5); + for (int time = 2000; time <= 4000; time += 1000) { + final int rowIndex = tabletWithUnusedTail.rowSize++; + tabletWithUnusedTail.addTimestamp(rowIndex, time); + tabletWithUnusedTail.addValue("at1", rowIndex, time / 1000); + } + Assert.assertEquals(3, tabletWithUnusedTail.rowSize); + Assert.assertEquals(5, tabletWithUnusedTail.timestamps.length); + try (final ISession session = senderEnv.getSessionConnection()) { + session.insertTablet(tabletWithUnusedTail); + } + + expectedCountResult.put("count(root.db.d5.at1)", "3"); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "select count(*) from root.db.**", expectedCountResult); // Insert realtime data that does not overlap with time range TestUtils.executeNonQueries( @@ -823,9 +844,19 @@ public void testExtractorTimeRangeMatch() throws Exception { TestUtils.assertDataAlwaysOnEnv( receiverEnv, - "select count(*) from root.**", - "count(root.db.d1.at1),count(root.db.d3.at1),", - Collections.singleton("3,3,")); + "select count(at1) from root.db.d1, root.db.d3, root.db.d5", + "count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),", + Collections.singleton("3,3,3,")); + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "show timeseries root.db.d2.**", + "Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,", + Collections.emptySet()); + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "show timeseries root.db.d4.**", + "Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,", + Collections.emptySet()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index bc7040a05985..84a0f533e234 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -214,12 +214,14 @@ public boolean mayEventTimeOverlappedWithTimeRange() { } if (insertNode instanceof InsertTabletNode) { - final long[] timestamps = ((InsertTabletNode) insertNode).getTimes(); - if (Objects.isNull(timestamps) || timestamps.length == 0) { + final InsertTabletNode insertTabletNode = (InsertTabletNode) insertNode; + final long[] timestamps = insertTabletNode.getTimes(); + final int rowCount = insertTabletNode.getRowCount(); + if (Objects.isNull(timestamps) || rowCount <= 0) { return false; } // We assume that `timestamps` is ordered. - return startTime <= timestamps[timestamps.length - 1] && timestamps[0] <= endTime; + return startTime <= timestamps[rowCount - 1] && timestamps[0] <= endTime; } if (insertNode instanceof InsertRowsNode) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 6d6995f0231e..f47544ab64f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -318,11 +318,12 @@ public boolean isGeneratedByPipe() { @Override public boolean mayEventTimeOverlappedWithTimeRange() { final long[] timestamps = tablet.timestamps; - if (Objects.isNull(timestamps) || timestamps.length == 0) { + final int rowSize = tablet.rowSize; + if (Objects.isNull(timestamps) || rowSize <= 0) { return false; } // We assume that `timestamps` is ordered. - return startTime <= timestamps[timestamps.length - 1] && timestamps[0] <= endTime; + return startTime <= timestamps[rowSize - 1] && timestamps[0] <= endTime; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java index b9da59f31111..a6a69144028f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java @@ -31,6 +31,12 @@ public PipeTabletCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) super(pipeTaskMeta, sourceEvent); } + public PipeTabletCollector( + PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent, boolean isAligned) { + this(pipeTaskMeta, sourceEvent); + this.isAligned = isAligned; + } + public PipeTabletCollector( PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index d8c2bccaa97c..a2a8c27ef264 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -218,8 +218,13 @@ private void parse(final InsertTabletNode insertTabletNode, final PipePattern pa this.isAligned = insertTabletNode.isAligned(); final long[] originTimestampColumn = insertTabletNode.getTimes(); - final List rowIndexList = generateRowIndexList(originTimestampColumn); - this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); + final int originRowCount = insertTabletNode.getRowCount(); + final long[] actualTimestampColumn = + originTimestampColumn.length == originRowCount + ? originTimestampColumn + : Arrays.copyOf(originTimestampColumn, originRowCount); + final List rowIndexList = generateRowIndexList(actualTimestampColumn); + this.timestampColumn = rowIndexList.stream().mapToLong(i -> actualTimestampColumn[i]).toArray(); generateColumnIndexMapper( insertTabletNode.getMeasurements(), @@ -407,6 +412,9 @@ else if (pattern.mayOverlapWithDevice(deviceId)) { private List generateRowIndexList(final long[] originTimestampColumn) { final int rowCount = originTimestampColumn.length; + if (rowCount == 0) { + return generateFullRowIndexList(rowCount); + } if (Objects.isNull(sourceEvent) || !sourceEvent.shouldParseTime()) { return generateFullRowIndexList(rowCount); } @@ -680,7 +688,8 @@ public List processTablet(final BiConsumer processTabletWithCollect( BiConsumer consumer) { - final PipeTabletCollector tabletCollector = new PipeTabletCollector(pipeTaskMeta, sourceEvent); + final PipeTabletCollector tabletCollector = + new PipeTabletCollector(pipeTaskMeta, sourceEvent, isAligned); consumer.accept(convertToTablet(), tabletCollector); return tabletCollector.convertToTabletInsertionEvents(shouldReport); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index 8a290bd18031..b46a86e9944a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -24,12 +24,14 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern; import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow; import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.pipe.event.common.tablet.TabletInsertionDataContainer; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; @@ -42,6 +44,7 @@ import org.junit.Test; import java.time.LocalDate; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -323,6 +326,30 @@ public void convertToAlignedTabletForTest() { Assert.assertTrue(isAligned4); } + @Test + public void processAlignedTabletWithCollectPreservesAlignmentForTest() { + final PipeRawTabletInsertionEvent event = + new PipeRawTabletInsertionEvent( + tabletForInsertTabletNode, true, new PrefixPipePattern(pattern)); + + final List events = new ArrayList<>(); + event + .processTabletWithCollect( + (tablet, collector) -> { + try { + collector.collectTablet(tablet); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }) + .forEach(events::add); + + Assert.assertEquals(1, events.size()); + final PipeRawTabletInsertionEvent collectedEvent = (PipeRawTabletInsertionEvent) events.get(0); + Assert.assertEquals(tabletForInsertTabletNode, collectedEvent.convertToTablet()); + Assert.assertTrue(collectedEvent.isAligned()); + } + @Test public void collectRowWithOverriddenTreeDatabaseForTest() { final PipeRowCollector rowCollector = new PipeRowCollector(null, null, "root.test.sg_0", false); @@ -449,4 +476,50 @@ public void isEventTimeOverlappedWithTimeRangeTest() { event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 115L, Long.MAX_VALUE); Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange()); } + + @Test + public void isEventTimeOverlappedWithTimeRangeUsesActualRowSizeForTest() throws Exception { + final long[] timestamps = new long[] {110L, 111L, 112L, 0L, 0L}; + + final Tablet partialTablet = new Tablet(deviceId, Arrays.asList(schemas), times.length); + partialTablet.timestamps = timestamps; + partialTablet.rowSize = 3; + + PipeRawTabletInsertionEvent rawEvent = + new PipeRawTabletInsertionEvent(partialTablet, 111L, 112L); + Assert.assertTrue(rawEvent.mayEventTimeOverlappedWithTimeRange()); + rawEvent = new PipeRawTabletInsertionEvent(partialTablet, 113L, Long.MAX_VALUE); + Assert.assertFalse(rawEvent.mayEventTimeOverlappedWithTimeRange()); + + final InsertTabletNode partialInsertTabletNode = + new InsertTabletNode( + new PlanNodeId("partial tablet node"), + new PartialPath(deviceId), + false, + measurementIds, + dataTypes, + schemas, + timestamps, + null, + insertTabletNode.getColumns(), + 3); + + final Tablet convertedTablet = + new TabletInsertionDataContainer(partialInsertTabletNode, new PrefixPipePattern(pattern)) + .convertToTablet(); + Assert.assertEquals(3, convertedTablet.rowSize); + Assert.assertArrayEquals( + new long[] {110L, 111L, 112L}, + Arrays.copyOf(convertedTablet.timestamps, convertedTablet.rowSize)); + + PipeInsertNodeTabletInsertionEvent insertNodeEvent = + new PipeInsertNodeTabletInsertionEvent(partialInsertTabletNode) + .shallowCopySelfAndBindPipeTaskMetaForProgressReport(null, 0, null, null, 111L, 112L); + Assert.assertTrue(insertNodeEvent.mayEventTimeOverlappedWithTimeRange()); + insertNodeEvent = + new PipeInsertNodeTabletInsertionEvent(partialInsertTabletNode) + .shallowCopySelfAndBindPipeTaskMetaForProgressReport( + null, 0, null, null, 113L, Long.MAX_VALUE); + Assert.assertFalse(insertNodeEvent.mayEventTimeOverlappedWithTimeRange()); + } }