diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java index bb160a6f2952..c592aca4e6af 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java @@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.ISession; import org.apache.iotdb.it.env.MultiEnvFactory; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; @@ -33,6 +34,9 @@ import org.apache.iotdb.itbase.env.BaseEnv; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -48,6 +52,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import static org.junit.Assert.fail; @@ -757,6 +762,11 @@ public void testExtractorTimeRangeMatch() throws Exception { final String receiverIp = receiverDataNode.getIp(); final int receiverPort = receiverDataNode.getPort(); + final Consumer handleFailure = + o -> { + TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); + TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); + }; try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { @@ -793,9 +803,10 @@ public void testExtractorTimeRangeMatch() throws Exception { TestUtils.assertDataEventuallyOnEnv( receiverEnv, - "select count(*) from root.**", + "select count(at1) from root.db.d1", "count(root.db.d1.at1),", - Collections.singleton("3,")); + Collections.singleton("3,"), + handleFailure); // Insert realtime data that overlapped with time range TestUtils.executeNonQueries( @@ -808,9 +819,33 @@ public void testExtractorTimeRangeMatch() throws Exception { TestUtils.assertDataEventuallyOnEnv( receiverEnv, - "select count(*) from root.**", + "select count(at1) from root.db.d1, root.db.d3", "count(root.db.d1.at1),count(root.db.d3.at1),", - Collections.singleton("3,3,")); + Collections.singleton("3,3,"), + handleFailure); + + // Session Tablet can have unused timestamp slots when rowSize is smaller than maxRowNumber. + // The pipe source time range filter should ignore the unused zero tail. + final List schemas = + Collections.singletonList(new MeasurementSchema("at1", TSDataType.INT32)); + final Tablet tabletWithUnusedTail = new Tablet("root.db.d5", schemas, 5); + for (int time = 2000; time <= 4000; time += 1000) { + final int rowIndex = tabletWithUnusedTail.rowSize++; + tabletWithUnusedTail.addTimestamp(rowIndex, time); + tabletWithUnusedTail.addValue("at1", rowIndex, time / 1000); + } + Assert.assertEquals(3, tabletWithUnusedTail.rowSize); + Assert.assertEquals(5, tabletWithUnusedTail.timestamps.length); + try (final ISession session = senderEnv.getSessionConnection()) { + session.insertTablet(tabletWithUnusedTail); + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(at1) from root.db.d1, root.db.d3, root.db.d5", + "count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),", + Collections.singleton("3,3,3,"), + handleFailure); // Insert realtime data that does not overlap with time range TestUtils.executeNonQueries( @@ -823,9 +858,20 @@ public void testExtractorTimeRangeMatch() throws Exception { TestUtils.assertDataAlwaysOnEnv( receiverEnv, - "select count(*) from root.**", - "count(root.db.d1.at1),count(root.db.d3.at1),", - Collections.singleton("3,3,")); + "select count(at1) from root.db.d1, root.db.d3, root.db.d5", + "count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),", + Collections.singleton("3,3,3,"), + 600); + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "show timeseries root.db.d2.**", + "Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,", + Collections.emptySet()); + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "show timeseries root.db.d4.**", + "Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,", + Collections.emptySet()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java index d184c4647f1d..805421bb5048 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java @@ -51,7 +51,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -70,6 +69,10 @@ public class TagManager { private static final String PREVIOUS_CONDITION = "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b"; + // The tag index memory model adds one int-sized estimated overhead for each indexed key, value, + // and measurement reference. This is an accounting estimate rather than a specific + // ConcurrentHashMap or Set field. + private static final long INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES = Integer.BYTES; private static final Logger logger = LoggerFactory.getLogger(TagManager.class); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); @@ -164,34 +167,31 @@ public void addIndex(String tagKey, String tagValue, IMeasurementMNode measur return; } - int tagIndexOldSize = tagIndex.size(); - Map>> tagValueMap = - tagIndex.computeIfAbsent(tagKey, k -> new ConcurrentHashMap<>()); - int tagIndexNewSize = tagIndex.size(); - - int tagValueMapOldSize = tagValueMap.size(); - Set> measurementsSet = - tagValueMap.computeIfAbsent(tagValue, v -> Collections.synchronizedSet(new HashSet<>())); - int tagValueMapNewSize = tagValueMap.size(); + tagIndex.compute( + tagKey, + (key, tagValueMap) -> { + long memorySize = 0; + if (tagValueMap == null) { + tagValueMap = new ConcurrentHashMap<>(); + memorySize += RamUsageEstimator.sizeOf(tagKey) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; + } - int measurementsSetOldSize = measurementsSet.size(); - measurementsSet.add(measurementMNode); - int measurementsSetNewSize = measurementsSet.size(); + Set> measurementsSet = tagValueMap.get(tagValue); + if (measurementsSet == null) { + measurementsSet = ConcurrentHashMap.newKeySet(); + tagValueMap.put(tagValue, measurementsSet); + memorySize += RamUsageEstimator.sizeOf(tagValue) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; + } - long memorySize = 0; - if (tagIndexNewSize - tagIndexOldSize == 1) { - // the last 4 is the memory occupied by the size of tagvaluemap - memorySize += RamUsageEstimator.sizeOf(tagKey) + 4; - } - if (tagValueMapNewSize - tagValueMapOldSize == 1) { - // the last 4 is the memory occupied by the size of measurementsSet - memorySize += RamUsageEstimator.sizeOf(tagValue) + 4; - } - if (measurementsSetNewSize - measurementsSetOldSize == 1) { - // 8 is the memory occupied by the length of the IMeasurementMNode - memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4; - } - requestMemory(memorySize); + if (measurementsSet.add(measurementMNode)) { + memorySize += + RamUsageEstimator.NUM_BYTES_OBJECT_REF + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; + } + if (memorySize > 0) { + requestMemory(memorySize); + } + return tagValueMap; + }); } public void addIndex(Map tagsMap, IMeasurementMNode measurementMNode) { @@ -206,32 +206,47 @@ public void removeIndex(String tagKey, String tagValue, IMeasurementMNode mea if (tagKey == null || tagValue == null || measurementMNode == null) { return; } - // init memory size - long memorySize = 0; - if (tagIndex.get(tagKey).get(tagValue).remove(measurementMNode)) { - memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4; - } - if (tagIndex.get(tagKey).get(tagValue).isEmpty()) { - if (tagIndex.get(tagKey).remove(tagValue) != null) { - // the last 4 is the memory occupied by the size of IMeasurementMNodeSet - memorySize += RamUsageEstimator.sizeOf(tagValue) + 4; - } - } - if (tagIndex.get(tagKey).isEmpty()) { - if (tagIndex.remove(tagKey) != null) { - // the last 4 is the memory occupied by the size of tagValueMap - memorySize += RamUsageEstimator.sizeOf(tagKey) + 4; - } - } - releaseMemory(memorySize); + tagIndex.computeIfPresent( + tagKey, + (key, tagValueMap) -> { + long memorySize = 0; + Set> measurementsSet = tagValueMap.get(tagValue); + if (measurementsSet == null) { + return tagValueMap; + } + + if (measurementsSet.remove(measurementMNode)) { + memorySize += + RamUsageEstimator.NUM_BYTES_OBJECT_REF + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; + } + if (measurementsSet.isEmpty()) { + if (tagValueMap.remove(tagValue, measurementsSet)) { + memorySize += + RamUsageEstimator.sizeOf(tagValue) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; + } + } + if (tagValueMap.isEmpty()) { + memorySize += RamUsageEstimator.sizeOf(tagKey) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES; + if (memorySize > 0) { + releaseMemory(memorySize); + } + return null; + } + if (memorySize > 0) { + releaseMemory(memorySize); + } + return tagValueMap; + }); + } + + private boolean containsIndex(String tagKey, String tagValue) { + Map>> tagValueMap = tagIndex.get(tagKey); + return tagValueMap != null && tagValueMap.containsKey(tagValue); } private List> getMatchedTimeseriesInIndex(TagFilter tagFilter) { - if (!tagIndex.containsKey(tagFilter.getKey())) { - return Collections.emptyList(); - } Map>> value2Node = tagIndex.get(tagFilter.getKey()); - if (value2Node.isEmpty()) { + if (value2Node == null || value2Node.isEmpty()) { return Collections.emptyList(); } @@ -362,8 +377,7 @@ public void removeFromTagInvertedIndex(IMeasurementMNode node) throws IOExcep Map tagMap = tagLogFile.readTag(node.getOffset()); if (tagMap != null) { for (Map.Entry entry : tagMap.entrySet()) { - if (tagIndex.containsKey(entry.getKey()) - && tagIndex.get(entry.getKey()).containsKey(entry.getValue())) { + if (containsIndex(entry.getKey(), entry.getValue())) { if (logger.isDebugEnabled()) { logger.debug( String.format( @@ -415,7 +429,7 @@ public void updateTagsAndAttributes( // we should remove before key-value from inverted index map if (beforeValue != null && !beforeValue.equals(value)) { - if (tagIndex.containsKey(key) && tagIndex.get(key).containsKey(beforeValue)) { + if (containsIndex(key, beforeValue)) { if (logger.isDebugEnabled()) { logger.debug( String.format( @@ -548,8 +562,7 @@ public void dropTagsOrAttributes( if (!deleteTag.isEmpty()) { for (Map.Entry entry : deleteTag.entrySet()) { - if (tagIndex.containsKey((entry.getKey())) - && tagIndex.get(entry.getKey()).containsKey(entry.getValue())) { + if (containsIndex(entry.getKey(), entry.getValue())) { if (logger.isDebugEnabled()) { logger.debug( String.format( @@ -618,7 +631,7 @@ public void setTagsOrAttributesValue( String beforeValue = entry.getValue(); String currentValue = newTagValue.get(key); // change the tag inverted index map - if (tagIndex.containsKey(key) && tagIndex.get(key).containsKey(beforeValue)) { + if (containsIndex(key, beforeValue)) { if (logger.isDebugEnabled()) { logger.debug( @@ -676,7 +689,7 @@ public void renameTagOrAttributeKey( // persist the change to disk tagLogFile.write(pair.left, pair.right, leafMNode.getOffset()); // change the tag inverted index map - if (tagIndex.containsKey(oldKey) && tagIndex.get(oldKey).containsKey(value)) { + if (containsIndex(oldKey, value)) { if (logger.isDebugEnabled()) { logger.debug( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java new file mode 100644 index 000000000000..1d78b4a54836 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManagerTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.schemaengine.schemaregion.tag; + +import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode; +import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory; +import org.apache.iotdb.db.schemaengine.rescon.MemSchemaEngineStatistics; +import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.IMemMNode; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader; + +import org.apache.commons.io.FileUtils; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class TagManagerTest { + + private File tempDir; + private MemSchemaRegionStatistics regionStatistics; + private TagManager tagManager; + + @After + public void tearDown() throws Exception { + if (tagManager != null) { + tagManager.clear(); + } + if (regionStatistics != null) { + regionStatistics.clear(); + } + if (tempDir != null) { + FileUtils.deleteDirectory(tempDir); + } + } + + @Test + public void removeIndexIgnoresMissingEntriesAndReleasesOnlyExistingMemory() throws Exception { + initTagManager(); + final IMeasurementMNode node = newMeasurementMNode("s0"); + + tagManager.removeIndex("missingKey", "missingValue", node); + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + + tagManager.addIndex("key", "value", node); + final long expectedMemory = indexMemory("key", "value", 1); + Assert.assertEquals(expectedMemory, regionStatistics.getRegionMemoryUsage()); + + tagManager.removeIndex("key", "missingValue", node); + Assert.assertEquals(expectedMemory, regionStatistics.getRegionMemoryUsage()); + + tagManager.removeIndex("key", "value", newMeasurementMNode("other")); + Assert.assertEquals(expectedMemory, regionStatistics.getRegionMemoryUsage()); + + tagManager.removeIndex("key", "value", node); + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + + tagManager.removeIndex("key", "value", node); + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + } + + @Test + public void concurrentAddIndexRequestsMemoryForActualInsertionsOnly() throws Exception { + initTagManager(); + final String tagKey = "key"; + final String tagValue = "value"; + final int measurementCount = 128; + final List> nodes = new ArrayList<>(); + for (int i = 0; i < measurementCount; i++) { + nodes.add(newMeasurementMNode("s" + i)); + } + + final int workerCount = 16; + final ExecutorService executorService = Executors.newFixedThreadPool(workerCount); + final CountDownLatch readyLatch = new CountDownLatch(workerCount); + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + for (final IMeasurementMNode node : nodes) { + futures.add( + executorService.submit( + () -> { + readyLatch.countDown(); + startLatch.await(); + tagManager.addIndex(tagKey, tagValue, node); + return null; + })); + } + + try { + Assert.assertTrue(readyLatch.await(10, TimeUnit.SECONDS)); + startLatch.countDown(); + for (final Future future : futures) { + future.get(10, TimeUnit.SECONDS); + } + } finally { + executorService.shutdownNow(); + } + Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); + + final long expectedMemory = indexMemory(tagKey, tagValue, measurementCount); + Assert.assertEquals(expectedMemory, regionStatistics.getRegionMemoryUsage()); + + for (final IMeasurementMNode node : nodes) { + tagManager.addIndex(tagKey, tagValue, node); + } + Assert.assertEquals(expectedMemory, regionStatistics.getRegionMemoryUsage()); + + for (final IMeasurementMNode node : nodes) { + tagManager.removeIndex(tagKey, tagValue, node); + } + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + } + + @Test + public void concurrentAddAndRemoveIndexEventuallyReleasesAllMemory() throws Exception { + initTagManager(); + final String tagKey = "key"; + final String tagValue = "value"; + final IMeasurementMNode node = newMeasurementMNode("s0"); + + final int workerCount = 16; + final int roundCount = 1000; + final ExecutorService executorService = Executors.newFixedThreadPool(workerCount); + final CountDownLatch readyLatch = new CountDownLatch(workerCount); + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + for (int i = 0; i < workerCount; i++) { + futures.add( + executorService.submit( + () -> { + readyLatch.countDown(); + startLatch.await(); + for (int round = 0; round < roundCount; round++) { + tagManager.addIndex(tagKey, tagValue, node); + tagManager.removeIndex(tagKey, tagValue, node); + } + return null; + })); + } + + try { + Assert.assertTrue(readyLatch.await(10, TimeUnit.SECONDS)); + startLatch.countDown(); + for (final Future future : futures) { + future.get(10, TimeUnit.SECONDS); + } + } finally { + executorService.shutdownNow(); + } + Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); + + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + + tagManager.addIndex(tagKey, tagValue, node); + Assert.assertEquals(indexMemory(tagKey, tagValue, 1), regionStatistics.getRegionMemoryUsage()); + + tagManager.removeIndex(tagKey, tagValue, node); + Assert.assertEquals(0, regionStatistics.getRegionMemoryUsage()); + } + + private void initTagManager() throws Exception { + tempDir = Files.createTempDirectory("tag-manager").toFile(); + regionStatistics = new MemSchemaRegionStatistics(0, new MemSchemaEngineStatistics()); + tagManager = new TagManager(tempDir.getAbsolutePath(), regionStatistics); + } + + private static IMeasurementMNode newMeasurementMNode(final String measurement) { + final IMNodeFactory nodeFactory = + MNodeFactoryLoader.getInstance().getMemMNodeIMNodeFactory(); + return nodeFactory.createMeasurementMNode( + null, + measurement, + new MeasurementSchema( + measurement, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY), + null); + } + + private static long indexMemory( + final String tagKey, final String tagValue, final int measurementCount) { + return RamUsageEstimator.sizeOf(tagKey) + + 4 + + RamUsageEstimator.sizeOf(tagValue) + + 4 + + (RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4) * measurementCount; + } +}