diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java index 7c8ec7cf28239..bd7f7d699f10d 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java @@ -62,7 +62,6 @@ public final class ManagerMessages { "DataRegionGroupExtensionPolicy %s doesn't exist."; public static final String DECREASE_REFERENCE_COUNT_FOR_SNAPSHOT_ERROR = "Decrease reference count for snapshot {} error."; - public static final String DELETING_REGIONS_COSTS_MS = "Deleting regions costs {}ms"; public static final String DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE = "Detected historical pipe completion report from DataNode {} for pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}"; public static final String DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT = @@ -424,8 +423,6 @@ public final class ManagerMessages { "Start to create Region: {} on DataNode: {}"; public static final String START_TO_CREATE_UDF_ON_DATA_NODES_NEEDTOSAVEJAR = "Start to create UDF [{}] on Data Nodes, needToSaveJar[{}]"; - public static final String START_TO_DELETE_REGION_ON_DATANODE = - "Start to delete Region: {} on DataNode: {}"; public static final String START_TRANSFER_OF = "Start transfer of {}"; public static final String STOP_SUBMITTING_CQ_BECAUSE = "Stop submitting CQ {} because {}"; public static final String STOP_SUBMITTING_CQ_BECAUSE_CURRENT_NODE_IS_NOT_LEADER_OR = @@ -508,6 +505,8 @@ public final class ManagerMessages { "Unexpected interruption during waiting for configNode leader ready."; public static final String UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_GET_CLUSTER_ID = "Unexpected interruption during waiting for get cluster id."; + public static final String UNEXPECTED_NON_CREATE_REGION_MAINTAIN_TASK_SKIPPED = + "Unexpected non-create task in the RegionMaintainer queue; skipping it (the queue only recreates region replicas now, and region deletion is handled by RemoveRegionGroupProcedure)."; public static final String UNEXPECTED_NULL_PROCEDURE_PARAMETERS_FOR_WAITINGPROCEDUREFINISHED = "Unexpected null procedure parameters for waitingProcedureFinished"; public static final String UNKNOWN_DATAPARTITION_ALLOCATION_STRATEGY_USING_INHERIT_STRATEGY_BY_DEFAULT = diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java index f4ae087b159b7..4d3cea6f8ebfe 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java @@ -616,6 +616,16 @@ public final class ProcedureMessages { public static final String PID_ADDREGION_STATE_FAILED = "[pid{}][AddRegion] state {} failed"; public static final String PID_ADDREGION_SUCCESS_HAS_BEEN_ADDED_TO_DATANODE_PROCEDURE_TOOK = "[pid{}][AddRegion] success, {} has been added to DataNode {}. Procedure took {} (start at {})."; + public static final String PID_REMOVEREGIONGROUP_STARTED_WILL_BE_DELETED = + "[pid{}][RemoveRegionGroup] started, region group {} will be deleted from DataNodes {}."; + public static final String PID_REMOVEREGIONGROUP_STARTED_REPLICA_WILL_BE_DELETED_FROM_DATANODE = + "[pid{}][RemoveRegionGroup] region {} will be deleted from DataNode {}."; + public static final String PID_REMOVEREGIONGROUP_STATE_FAILED = + "[pid{}][RemoveRegionGroup] state {} failed"; + public static final String PID_REMOVEREGIONGROUP_DELETE_REPLICA_FAILED = + "[pid{}][RemoveRegionGroup] failed to delete a replica of region {} (attempt {}/{}), will retry. reason: {}"; + public static final String PID_REMOVEREGIONGROUP_SUCCESS_PROCEDURE_TOOK = + "[pid{}][RemoveRegionGroup] success, region group {} has been deleted. Procedure took {} (started at {})."; public static final String PID_MIGRATEREGION_STARTED_WILL_BE_MIGRATED_FROM_DATANODE_TO = "[pid{}][MigrateRegion] started, {} will be migrated from DataNode {} to {}."; public static final String PID_MIGRATEREGION_STATE_COMPLETE = diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java index 1008481fdfac0..b73d5d2d2c994 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java @@ -62,7 +62,6 @@ public final class ManagerMessages { "DataRegionGroupExtensionPolicy %s doesn't exist."; public static final String DECREASE_REFERENCE_COUNT_FOR_SNAPSHOT_ERROR = "Decrease reference count for snapshot {} error."; - public static final String DELETING_REGIONS_COSTS_MS = "Deleting regions costs {}ms"; public static final String DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE = "检测到来自 DataNode {} 的历史 pipe 完成上报,pipe {}。remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}"; public static final String DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT = @@ -419,8 +418,6 @@ public final class ManagerMessages { "Start to create Region: {} on DataNode: {}"; public static final String START_TO_CREATE_UDF_ON_DATA_NODES_NEEDTOSAVEJAR = "Start to create UDF [{}] on Data Nodes, needToSaveJar[{}]"; - public static final String START_TO_DELETE_REGION_ON_DATANODE = - "Start to delete Region: {} on DataNode: {}"; public static final String START_TRANSFER_OF = "Start transfer of {}"; public static final String STOP_SUBMITTING_CQ_BECAUSE = "Stop submitting CQ {} because {}"; public static final String STOP_SUBMITTING_CQ_BECAUSE_CURRENT_NODE_IS_NOT_LEADER_OR = @@ -503,6 +500,8 @@ public final class ManagerMessages { "Unexpected interruption during waiting for configNode leader ready."; public static final String UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_GET_CLUSTER_ID = "Unexpected interruption during waiting for get cluster id."; + public static final String UNEXPECTED_NON_CREATE_REGION_MAINTAIN_TASK_SKIPPED = + "Unexpected non-create task in the RegionMaintainer queue; skipping it (the queue only recreates region replicas now, and region deletion is handled by RemoveRegionGroupProcedure)."; public static final String UNEXPECTED_NULL_PROCEDURE_PARAMETERS_FOR_WAITINGPROCEDUREFINISHED = "Unexpected null procedure parameters for waitingProcedureFinished"; public static final String UNKNOWN_DATAPARTITION_ALLOCATION_STRATEGY_USING_INHERIT_STRATEGY_BY_DEFAULT = diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java index 126055a7c57c7..9602839fae94a 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java @@ -614,6 +614,16 @@ public final class ProcedureMessages { public static final String PID_ADDREGION_STATE_FAILED = "[pid{}][AddRegion] state {} failed"; public static final String PID_ADDREGION_SUCCESS_HAS_BEEN_ADDED_TO_DATANODE_PROCEDURE_TOOK = "[pid{}][AddRegion] success, {} has been added to DataNode {}. Procedure took {} (start at {})."; + public static final String PID_REMOVEREGIONGROUP_STARTED_WILL_BE_DELETED = + "[pid{}][RemoveRegionGroup] 开始,region group {} 将从 DataNode {} 上删除。"; + public static final String PID_REMOVEREGIONGROUP_STARTED_REPLICA_WILL_BE_DELETED_FROM_DATANODE = + "[pid{}][RemoveRegionGroup] region {} 将从 DataNode {} 上删除。"; + public static final String PID_REMOVEREGIONGROUP_STATE_FAILED = + "[pid{}][RemoveRegionGroup] 状态 {} 失败"; + public static final String PID_REMOVEREGIONGROUP_DELETE_REPLICA_FAILED = + "[pid{}][RemoveRegionGroup] 删除 region {} 的一个副本失败(第 {}/{} 次尝试),将重试。原因:{}"; + public static final String PID_REMOVEREGIONGROUP_SUCCESS_PROCEDURE_TOOK = + "[pid{}][RemoveRegionGroup] 成功,region group {} 已删除。过程耗时 {}(开始于 {})。"; public static final String PID_MIGRATEREGION_STARTED_WILL_BE_MIGRATED_FROM_DATANODE_TO = "[pid{}][MigrateRegion] started, {} will be migrated from DataNode {} to {}."; public static final String PID_MIGRATEREGION_STATE_COMPLETE = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java index d4db2e5c5a64b..2d44c214967f6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java @@ -39,7 +39,6 @@ public enum CnToDnAsyncRequestType { // Region Maintenance CREATE_DATA_REGION, CREATE_SCHEMA_REGION, - DELETE_REGION, RESET_PEER_LIST, NOTIFY_REGION_MIGRATION, UPDATE_REGION_ROUTE_MAP, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java index 4aec391f7af2c..4048016548a16 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java @@ -19,7 +19,6 @@ package org.apache.iotdb.confignode.client.async; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; @@ -143,10 +142,6 @@ protected void initActionMapBuilder() { (req, client, handler) -> client.createDataRegion( (TCreateDataRegionReq) req, (DataNodeTSStatusRPCHandler) handler)); - actionMapBuilder.put( - CnToDnAsyncRequestType.DELETE_REGION, - (req, client, handler) -> - client.deleteRegion((TConsensusGroupId) req, (DataNodeTSStatusRPCHandler) handler)); actionMapBuilder.put( CnToDnAsyncRequestType.CREATE_SCHEMA_REGION, (req, client, handler) -> diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 2beeea6b4af4f..4884392dd9909 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -56,7 +56,6 @@ import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.i18n.ManagerMessages; -import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.persistence.ProcedureInfo; import org.apache.iotdb.confignode.procedure.PartitionTableAutoCleaner; import org.apache.iotdb.confignode.procedure.Procedure; @@ -317,9 +316,6 @@ public TSStatus deleteDatabases( } List results = new ArrayList<>(procedures.size()); procedures.forEach(procedure -> results.add(waitingProcedureFinished(procedure))); - // Clear the previously deleted regions - final PartitionManager partitionManager = getConfigManager().getPartitionManager(); - partitionManager.getRegionMaintainer().submit(partitionManager::maintainRegionReplicas); if (results.stream() .allMatch(result -> result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())) { return StatusUtils.OK; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 0edb0e389dc3a..2bbd0d0538135 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -81,9 +81,7 @@ import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType; import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure; import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; @@ -106,6 +104,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -1322,218 +1321,139 @@ public String getRegionDatabase(TConsensusGroupId regionId) { /** * Called by {@link PartitionManager#regionMaintainer}. * - *

Periodically maintain the RegionReplicas to be created or deleted + *

Periodically recreate the failed RegionReplicas offered to the RegionMaintainer queue. + * Region deletion is owned by {@code RemoveRegionGroupProcedure} and is no longer handled here. */ public void maintainRegionReplicas() { // The consensusManager of configManager may not be fully initialized at this time - Optional.ofNullable(getConsensusManager()) - .ifPresent( - consensusManager -> { - if (getConsensusManager().isLeader()) { - List regionMaintainTaskList = - partitionInfo.getRegionMaintainEntryList(); - - if (regionMaintainTaskList.isEmpty()) { - return; - } - - // Group tasks by region id - Map> regionMaintainTaskMap = - new HashMap<>(); - for (RegionMaintainTask regionMaintainTask : regionMaintainTaskList) { - regionMaintainTaskMap - .computeIfAbsent(regionMaintainTask.getRegionId(), k -> new LinkedList<>()) - .add(regionMaintainTask); - } - - while (!regionMaintainTaskMap.isEmpty()) { - // Select same type task from each region group - List selectedRegionMaintainTask = new ArrayList<>(); - RegionMaintainType currentType = null; - for (Map.Entry> entry : - regionMaintainTaskMap.entrySet()) { - RegionMaintainTask regionMaintainTask = entry.getValue().peek(); - if (regionMaintainTask == null) { - continue; - } - - if (currentType == null) { - currentType = regionMaintainTask.getType(); - selectedRegionMaintainTask.add(entry.getValue().peek()); - } else { - if (!currentType.equals(regionMaintainTask.getType())) { - continue; - } - - if (currentType.equals(RegionMaintainType.DELETE) - || entry - .getKey() - .getType() - .equals(selectedRegionMaintainTask.get(0).getRegionId().getType())) { - // Delete or same create task - selectedRegionMaintainTask.add(entry.getValue().peek()); - } - } - } - - if (selectedRegionMaintainTask.isEmpty()) { - break; - } + if (getConsensusManager() == null || !getConsensusManager().isLeader()) { + return; + } - Set successfulTask = new HashSet<>(); - switch (currentType) { - case CREATE: - // create region - switch (selectedRegionMaintainTask.get(0).getRegionId().getType()) { - case SchemaRegion: - // create SchemaRegion - DataNodeAsyncRequestContext - createSchemaRegionHandler = - new DataNodeAsyncRequestContext<>( - CnToDnAsyncRequestType.CREATE_SCHEMA_REGION); - for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) { - RegionCreateTask schemaRegionCreateTask = - (RegionCreateTask) regionMaintainTask; - LOGGER.info( - ManagerMessages.START_TO_CREATE_REGION_ON_DATANODE, - schemaRegionCreateTask.getRegionReplicaSet().getRegionId(), - schemaRegionCreateTask.getTargetDataNode()); - createSchemaRegionHandler.putRequest( - schemaRegionCreateTask.getRegionId().getId(), - new TCreateSchemaRegionReq( - schemaRegionCreateTask.getRegionReplicaSet(), - schemaRegionCreateTask.getStorageGroup())); - createSchemaRegionHandler.putNodeLocation( - schemaRegionCreateTask.getRegionId().getId(), - schemaRegionCreateTask.getTargetDataNode()); - } - - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithRetry(createSchemaRegionHandler); - - for (Map.Entry entry : - createSchemaRegionHandler.getResponseMap().entrySet()) { - if (entry.getValue().getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successfulTask.add( - new TConsensusGroupId( - TConsensusGroupType.SchemaRegion, entry.getKey())); - } - } - break; - case DataRegion: - // Create DataRegion - DataNodeAsyncRequestContext - createDataRegionHandler = - new DataNodeAsyncRequestContext<>( - CnToDnAsyncRequestType.CREATE_DATA_REGION); - for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) { - RegionCreateTask dataRegionCreateTask = - (RegionCreateTask) regionMaintainTask; - LOGGER.info( - ManagerMessages.START_TO_CREATE_REGION_ON_DATANODE, - dataRegionCreateTask.getRegionReplicaSet().getRegionId(), - dataRegionCreateTask.getTargetDataNode()); - createDataRegionHandler.putRequest( - dataRegionCreateTask.getRegionId().getId(), - new TCreateDataRegionReq( - dataRegionCreateTask.getRegionReplicaSet(), - dataRegionCreateTask.getStorageGroup())); - createDataRegionHandler.putNodeLocation( - dataRegionCreateTask.getRegionId().getId(), - dataRegionCreateTask.getTargetDataNode()); - } - - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithRetry(createDataRegionHandler); - - for (Map.Entry entry : - createDataRegionHandler.getResponseMap().entrySet()) { - if (entry.getValue().getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successfulTask.add( - new TConsensusGroupId( - TConsensusGroupType.DataRegion, entry.getKey())); - } - } - break; - } - break; - case DELETE: - // delete region - DataNodeAsyncRequestContext deleteRegionHandler = - new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.DELETE_REGION); - Map regionIdMap = new HashMap<>(); - for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) { - RegionDeleteTask regionDeleteTask = (RegionDeleteTask) regionMaintainTask; - LOGGER.info( - ManagerMessages.START_TO_DELETE_REGION_ON_DATANODE, - regionDeleteTask.getRegionId(), - regionDeleteTask.getTargetDataNode()); - deleteRegionHandler.putRequest( - regionDeleteTask.getRegionId().getId(), regionDeleteTask.getRegionId()); - deleteRegionHandler.putNodeLocation( - regionDeleteTask.getRegionId().getId(), - regionDeleteTask.getTargetDataNode()); - regionIdMap.put( - regionDeleteTask.getRegionId().getId(), regionDeleteTask.getRegionId()); - } - - long startTime = System.currentTimeMillis(); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithRetry(deleteRegionHandler); - - LOGGER.info( - ManagerMessages.DELETING_REGIONS_COSTS_MS, - (System.currentTimeMillis() - startTime)); - - for (Map.Entry entry : - deleteRegionHandler.getResponseMap().entrySet()) { - if (entry.getValue().getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successfulTask.add(regionIdMap.get(entry.getKey())); - } - } - break; - } + // Group the queued tasks into one FIFO sub-queue per region. The queue only ever holds + // RegionCreateTasks now (delete tasks are filtered out at the PartitionInfo ingestion points), + // and a region may carry several of them when more than one of its replicas failed to create. + final Map> tasksByRegion = new HashMap<>(); + for (RegionMaintainTask task : partitionInfo.getRegionMaintainEntryList()) { + if (!(task instanceof RegionCreateTask)) { + // Unreachable: the queue only holds create tasks now (legacy delete tasks are dropped at + // the + // PartitionInfo ingestion points). Guard against a regression so an unexpected task type + // cannot silently stall the loop. + LOGGER.warn(ManagerMessages.UNEXPECTED_NON_CREATE_REGION_MAINTAIN_TASK_SKIPPED); + continue; + } + tasksByRegion + .computeIfAbsent(task.getRegionId(), k -> new LinkedList<>()) + .add((RegionCreateTask) task); + } - if (successfulTask.isEmpty()) { - break; - } + // Drain the sub-queues head-by-head. Each round takes the head of every region, batches those + // heads by region type into a single create RPC per type, then durably polls the tasks that + // succeeded. Tasks of the same region are advanced one at a time to preserve their offer order. + while (!tasksByRegion.isEmpty()) { + final Map> headsByType = + new EnumMap<>(TConsensusGroupType.class); + for (Queue queue : tasksByRegion.values()) { + final RegionCreateTask head = queue.peek(); + headsByType.computeIfAbsent(head.getRegionId().getType(), k -> new ArrayList<>()).add(head); + } - for (TConsensusGroupId regionId : successfulTask) { - regionMaintainTaskMap.compute( - regionId, - (k, v) -> { - if (v == null) { - throw new IllegalStateException(); - } - v.poll(); - if (v.isEmpty()) { - return null; - } else { - return v; - } - }); - } + final Set successfulRegions = new HashSet<>(); + int selectedCount = 0; + for (Map.Entry> entry : headsByType.entrySet()) { + selectedCount += entry.getValue().size(); + successfulRegions.addAll(submitRegionCreateTasks(entry.getKey(), entry.getValue())); + } - // Poll the head entry if success - try { - getConsensusManager() - .write(new PollSpecificRegionMaintainTaskPlan(successfulTask)); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); - } + if (successfulRegions.isEmpty()) { + break; + } - if (successfulTask.size() < selectedRegionMaintainTask.size()) { - // Here we just break and wait until next schedule task - // due to all the RegionMaintainEntry should be executed by - // the order of they were offered - break; - } - } - } + // Advance the in-memory sub-queues so the next round picks the following task of each region. + for (TConsensusGroupId regionId : successfulRegions) { + tasksByRegion.computeIfPresent( + regionId, + (k, queue) -> { + queue.poll(); + return queue.isEmpty() ? null : queue; }); + } + + // Durably remove the head of every successfully created region from the persisted queue. + try { + getConsensusManager().write(new PollSpecificRegionMaintainTaskPlan(successfulRegions)); + } catch (ConsensusException e) { + LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + } + + if (successfulRegions.size() < selectedCount) { + // Some tasks failed this round; stop and retry on the next schedule so that the tasks of + // each region keep being executed in the order they were offered. + break; + } + } + } + + /** + * Send a batched create RPC for the given heads, all of which share the given region type, and + * return the ids of the regions whose replica was created successfully. + */ + private Set submitRegionCreateTasks( + TConsensusGroupType regionType, List createTasks) { + final Set successfulRegions = new HashSet<>(); + switch (regionType) { + case SchemaRegion: + final DataNodeAsyncRequestContext schemaHandler = + new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.CREATE_SCHEMA_REGION); + for (RegionCreateTask task : createTasks) { + LOGGER.info( + ManagerMessages.START_TO_CREATE_REGION_ON_DATANODE, + task.getRegionReplicaSet().getRegionId(), + task.getTargetDataNode()); + schemaHandler.putRequest( + task.getRegionId().getId(), + new TCreateSchemaRegionReq(task.getRegionReplicaSet(), task.getStorageGroup())); + schemaHandler.putNodeLocation(task.getRegionId().getId(), task.getTargetDataNode()); + } + CnToDnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithRetry(schemaHandler); + collectSuccessfulRegions( + schemaHandler.getResponseMap(), TConsensusGroupType.SchemaRegion, successfulRegions); + break; + case DataRegion: + final DataNodeAsyncRequestContext dataHandler = + new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.CREATE_DATA_REGION); + for (RegionCreateTask task : createTasks) { + LOGGER.info( + ManagerMessages.START_TO_CREATE_REGION_ON_DATANODE, + task.getRegionReplicaSet().getRegionId(), + task.getTargetDataNode()); + dataHandler.putRequest( + task.getRegionId().getId(), + new TCreateDataRegionReq(task.getRegionReplicaSet(), task.getStorageGroup())); + dataHandler.putNodeLocation(task.getRegionId().getId(), task.getTargetDataNode()); + } + CnToDnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithRetry(dataHandler); + collectSuccessfulRegions( + dataHandler.getResponseMap(), TConsensusGroupType.DataRegion, successfulRegions); + break; + default: + break; + } + return successfulRegions; + } + + private void collectSuccessfulRegions( + Map responseMap, + TConsensusGroupType regionType, + Set successfulRegions) { + for (Map.Entry entry : responseMap.entrySet()) { + if (entry.getValue().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + successfulRegions.add(new TConsensusGroupId(regionType, entry.getKey())); + } + } } public void startRegionCleaner() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index af9429e59532c..d6abb66355112 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -64,6 +64,7 @@ import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask; +import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType; import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; @@ -232,7 +233,19 @@ public TSStatus createRegionGroups(CreateRegionGroupsPlan plan) { public TSStatus offerRegionMaintainTasks( OfferRegionMaintainTasksPlan offerRegionMaintainTasksPlan) { synchronized (regionMaintainTaskList) { - regionMaintainTaskList.addAll(offerRegionMaintainTasksPlan.getRegionMaintainTaskList()); + // The RegionMaintainer queue only recreates failed region replicas now; region deletion is + // owned by RemoveRegionGroupProcedure. Drop any legacy DELETE task that an upgraded node may + // replay from an old consensus log, so it cannot get stuck in the queue and block the + // recreation of that region's other replicas. + for (RegionMaintainTask task : offerRegionMaintainTasksPlan.getRegionMaintainTaskList()) { + if (RegionMaintainType.DELETE.equals(task.getType())) { + LOGGER.info( + "Dropping legacy region-delete task for {} while replaying offer plan; region deletion is now handled by RemoveRegionGroupProcedure.", + task.getRegionId()); + continue; + } + regionMaintainTaskList.add(task); + } return RpcUtils.SUCCESS_STATUS; } } @@ -1073,11 +1086,21 @@ public void processLoadSnapshot(final File snapshotDir) throws TException, IOExc databasePartitionTables.put(database, databasePartitionTable); } - // restore deletedRegionSet + // restore the RegionMaintainer queue length = ReadWriteIOUtils.readInt(fileInputStream); for (int i = 0; i < length; i++) { final RegionMaintainTask task = RegionMaintainTask.Factory.create(fileInputStream, protocol); + // The RegionMaintainer queue only recreates failed region replicas now; region deletion is + // owned by RemoveRegionGroupProcedure. Drop any legacy DELETE task carried over from an + // upgraded snapshot so it cannot get stuck at the head of a region's queue and block the + // recreation of that region's other replicas. + if (RegionMaintainType.DELETE.equals(task.getType())) { + LOGGER.info( + "Dropping legacy region-delete task for {} while loading snapshot; region deletion is now handled by RemoveRegionGroupProcedure.", + task.getRegionId()); + continue; + } regionMaintainTaskList.add(task); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java index e9cce807e77fc..cb26afc97dd03 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java @@ -36,7 +36,6 @@ import org.apache.iotdb.confignode.i18n.ProcedureMessages; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; @@ -51,7 +50,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -107,6 +108,9 @@ protected Flow executeFromState( case SHUNT_REGION_REPLICAS: persistPlan = new CreateRegionGroupsPlan(); final OfferRegionMaintainTasksPlan offerPlan = new OfferRegionMaintainTasksPlan(); + // RegionGroups that failed to reach a serving quorum are removed via a child + // RemoveRegionGroupProcedure, which deletes every replica that did get created. + final List removeRegionGroupProcedures = new ArrayList<>(); // Filter those RegionGroups that created successfully createRegionGroupsPlan .getRegionGroupMap() @@ -154,7 +158,11 @@ protected Flow executeFromState( .CREATEREGIONGROUPS_FAILED_TO_CREATE_SOME_REPLICAS_OF_REGIONGROUP_BUT_THIS, regionReplicaSet.getRegionId()); } else { - // The redundant RegionReplicas should be deleted otherwise + // The redundant RegionReplicas (the ones that did get created) should + // be deleted otherwise + final TRegionReplicaSet redundantReplicas = + new TRegionReplicaSet() + .setRegionId(regionReplicaSet.getRegionId()); regionReplicaSet .getDataNodeLocations() .forEach( @@ -162,12 +170,13 @@ protected Flow executeFromState( if (!failedRegionReplicas .getDataNodeLocations() .contains(targetDataNode)) { - RegionDeleteTask deleteTask = - new RegionDeleteTask( - targetDataNode, regionReplicaSet.getRegionId()); - offerPlan.appendRegionMaintainTask(deleteTask); + redundantReplicas.addToDataNodeLocations(targetDataNode); } }); + if (redundantReplicas.getDataNodeLocationsSize() > 0) { + removeRegionGroupProcedures.add( + new RemoveRegionGroupProcedure(redundantReplicas)); + } LOGGER.info( ProcedureMessages @@ -188,6 +197,7 @@ protected Flow executeFromState( LOGGER.warn( ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); } + removeRegionGroupProcedures.forEach(this::addChildProcedure); setNextState(CreateRegionGroupsState.REBALANCE_DATA_PARTITION_POLICY); break; case REBALANCE_DATA_PARTITION_POLICY: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionGroupProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionGroupProcedure.java new file mode 100644 index 0000000000000..2dd2e0d6dd317 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionGroupProcedure.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.region; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; +import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils; +import org.apache.iotdb.commons.utils.CommonDateTimeUtils; +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.i18n.ProcedureMessages; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.state.RemoveRegionGroupState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult; + +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint; +import static org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler.simplifiedLocation; +import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; + +/** + * Delete a whole region group: every replica's consensus peer and all of its data on every DataNode + * that hosts it. + * + *

Each replica is removed with a local {@code deleteLocalPeer} (the {@code + * submitDeleteOldRegionPeerTask} path), which needs no consensus quorum and tolerates an + * already-absent peer, so it works for a group of any size — including a sub-quorum group that + * never finished forming. The DataNode runs the deletion asynchronously and this procedure polls + * for the result, so a slow deletion is never wrongly reported as finished. + */ +public class RemoveRegionGroupProcedure extends RegionOperationProcedure { + private static final Logger LOGGER = LoggerFactory.getLogger(RemoveRegionGroupProcedure.class); + + private static final int MAX_DELETE_REPLICA_RETRY = 3; + private static final long DELETE_REPLICA_RETRY_INTERVAL_MS = 5_000; + + private TRegionReplicaSet regionReplicaSet; + + // The index of the replica currently being deleted. Persisted and advanced only after that + // replica + // is deleted, so after a ConfigNode leader change the procedure resumes on the first replica it + // has + // not finished deleting. + private int currentReplicaIndex; + + // Number of failed attempts on the replica at currentReplicaIndex. Transient: a leader change + // restarts the retry budget for the current replica. + private transient int attemptedForCurrentReplica; + + public RemoveRegionGroupProcedure() { + super(); + } + + public RemoveRegionGroupProcedure(TRegionReplicaSet regionReplicaSet) { + super(regionReplicaSet.getRegionId()); + this.regionReplicaSet = regionReplicaSet; + } + + @TestOnly + void setCurrentReplicaIndex(int currentReplicaIndex) { + this.currentReplicaIndex = currentReplicaIndex; + } + + @Override + protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveRegionGroupState state) + throws InterruptedException { + final List dataNodeLocations = + regionReplicaSet == null ? null : regionReplicaSet.getDataNodeLocations(); + if (dataNodeLocations == null) { + // A null replica set means deserialization failed; fail loudly instead of silently reporting + // the group as deleted, otherwise the parent would drop the partition table while the region + // data is still on disk. + setFailure( + new ProcedureException(ProcedureMessages.UNSUPPORTED_STATE + "missing regionReplicaSet")); + return Flow.NO_MORE_STATE; + } + final RegionMaintainHandler handler = env.getRegionMaintainHandler(); + switch (state) { + case DELETE_REGION_REPLICAS: + if (currentReplicaIndex == 0 && attemptedForCurrentReplica == 0) { + LOGGER.info( + ProcedureMessages.PID_REMOVEREGIONGROUP_STARTED_WILL_BE_DELETED, + getProcId(), + regionId, + dataNodeLocations.stream() + .map(RegionMaintainHandler::simplifiedLocation) + .collect(Collectors.toList())); + } + if (currentReplicaIndex >= dataNodeLocations.size()) { + // Requirement: every successfully completed maintain task must be logged. + LOGGER.info( + ProcedureMessages.PID_REMOVEREGIONGROUP_SUCCESS_PROCEDURE_TOOK, + getProcId(), + regionId, + CommonDateTimeUtils.convertMillisecondToDurationStr( + System.currentTimeMillis() - getSubmittedTime()), + DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms")); + return Flow.NO_MORE_STATE; + } + + final TDataNodeLocation targetDataNode = dataNodeLocations.get(currentReplicaIndex); + LOGGER.info( + ProcedureMessages.PID_REMOVEREGIONGROUP_STARTED_REPLICA_WILL_BE_DELETED_FROM_DATANODE, + getProcId(), + regionId, + simplifiedLocation(targetDataNode)); + + // deleteLocalPeer is idempotent (it tolerates an already-absent peer) and the DataNode + // dedups by taskId, so re-submitting after a leader change or a retry is safe. + final TSStatus submitStatus; + final TRegionMigrateResult result; + try { + submitStatus = + handler.submitDeleteOldRegionPeerTask(getProcId(), targetDataNode, regionId); + setKillPoint(state); + if (submitStatus.getCode() != SUCCESS_STATUS.getStatusCode()) { + return retryCurrentReplicaOrFail( + String.format( + "submit delete task for region %s to DataNode %s failed: %s", + regionId, simplifiedLocation(targetDataNode), submitStatus)); + } + result = handler.waitTaskFinish(getProcId(), targetDataNode); + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + LOGGER.error(ProcedureMessages.PID_REMOVEREGIONGROUP_STATE_FAILED, getProcId(), state, e); + return retryCurrentReplicaOrFail( + String.format( + "delete region %s from DataNode %s threw %s", + regionId, simplifiedLocation(targetDataNode), e)); + } + + switch (result.getTaskStatus()) { + case SUCCESS: + // Advance to the next replica with a fresh retry budget. + currentReplicaIndex++; + attemptedForCurrentReplica = 0; + setNextState(RemoveRegionGroupState.DELETE_REGION_REPLICAS); + return Flow.HAS_MORE_STATE; + case PROCESSING: + // waitTaskFinish() only returns PROCESSING when its polling loop was interrupted, i.e. + // this ConfigNode is shutting down / losing leadership. The delete task is still + // running + // on the DataNode, so persist and re-poll after recovery: stay on this replica without + // advancing it and without consuming a retry attempt. + setNextState(RemoveRegionGroupState.DELETE_REGION_REPLICAS); + return Flow.HAS_MORE_STATE; + case TASK_NOT_EXIST: + case FAIL: + default: + return retryCurrentReplicaOrFail( + String.format( + "delete region %s from DataNode %s, task status is %s", + regionId, simplifiedLocation(targetDataNode), result.getTaskStatus())); + } + default: + setFailure(new ProcedureException(ProcedureMessages.UNSUPPORTED_STATE + state.name())); + return Flow.NO_MORE_STATE; + } + } + + /** + * Retry the replica at {@link #currentReplicaIndex} after a backoff, or fail the whole procedure + * once the per-replica retry budget is exhausted. Failing (rather than skipping the replica) + * keeps the parent from dropping the partition table while a region's peer/data is still on disk. + */ + private Flow retryCurrentReplicaOrFail(String reason) throws InterruptedException { + attemptedForCurrentReplica++; + if (attemptedForCurrentReplica <= MAX_DELETE_REPLICA_RETRY) { + LOGGER.warn( + ProcedureMessages.PID_REMOVEREGIONGROUP_DELETE_REPLICA_FAILED, + getProcId(), + regionId, + attemptedForCurrentReplica, + MAX_DELETE_REPLICA_RETRY + 1, + reason); + Thread.sleep(DELETE_REPLICA_RETRY_INTERVAL_MS); + setNextState(RemoveRegionGroupState.DELETE_REGION_REPLICAS); + return Flow.HAS_MORE_STATE; + } + setFailure( + new ProcedureException( + String.format( + "[pid%d][RemoveRegionGroup] gave up after %d attempts: %s", + getProcId(), attemptedForCurrentReplica, reason))); + return Flow.NO_MORE_STATE; + } + + @Override + protected void rollbackState(ConfigNodeProcedureEnv env, RemoveRegionGroupState state) + throws IOException, InterruptedException, ProcedureException {} + + @Override + protected RemoveRegionGroupState getState(int stateId) { + return RemoveRegionGroupState.values()[stateId]; + } + + @Override + protected int getStateId(RemoveRegionGroupState removeRegionGroupState) { + return removeRegionGroupState.ordinal(); + } + + @Override + protected RemoveRegionGroupState getInitialState() { + return RemoveRegionGroupState.DELETE_REGION_REPLICAS; + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.writeShort(ProcedureType.REMOVE_REGION_GROUP_PROCEDURE.getTypeCode()); + super.serialize(stream); + ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, stream); + ReadWriteIOUtils.write(currentReplicaIndex, stream); + } + + @Override + public void deserialize(ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + try { + regionReplicaSet = ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer); + regionId = regionReplicaSet.getRegionId(); + currentReplicaIndex = ReadWriteIOUtils.readInt(byteBuffer); + } catch (ThriftSerDeException e) { + LOGGER.error(ProcedureMessages.ERROR_IN_DESERIALIZE, this.getClass(), e); + } + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof RemoveRegionGroupProcedure)) { + return false; + } + RemoveRegionGroupProcedure procedure = (RemoveRegionGroupProcedure) obj; + return this.currentReplicaIndex == procedure.currentReplicaIndex + && Objects.equals(this.regionReplicaSet, procedure.regionReplicaSet); + } + + @Override + public int hashCode() { + return Objects.hash(regionReplicaSet, currentReplicaIndex); + } + + @Override + public String toString() { + return "RemoveRegionGroupProcedure{" + + "regionReplicaSet=" + + regionReplicaSet + + ", currentReplicaIndex=" + + currentReplicaIndex + + '}'; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index 7b8de7821739a..4b979105c2fb4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -19,29 +19,21 @@ package org.apache.iotdb.confignode.procedure.impl.schema; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils; -import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; -import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; -import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; import org.apache.iotdb.confignode.i18n.ProcedureMessages; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.impl.region.RemoveRegionGroupProcedure; import org.apache.iotdb.confignode.procedure.state.schema.DeleteDatabaseState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; -import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; @@ -51,10 +43,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; public class DeleteDatabaseProcedure @@ -114,86 +103,23 @@ protected Flow executeFromState(final ConfigNodeProcedureEnv env, final DeleteDa "[DeleteDatabaseProcedure] Delete DatabaseSchema: {}", deleteDatabaseSchema.getName()); - // Submit RegionDeleteTasks - final OfferRegionMaintainTasksPlan dataRegionDeleteTaskOfferPlan = - new OfferRegionMaintainTasksPlan(); + // Delete every region group (both schema and data regions) of this database via a + // RemoveRegionGroupProcedure child. The DatabasePartitionTable (handled in the next + // state) is only removed once these children have finished, so a slow region deletion is + // always completed before the coordinator forgets about it. final List regionReplicaSets = env.getAllReplicaSets(deleteDatabaseSchema.getName()); - final List schemaRegionReplicaSets = new ArrayList<>(); regionReplicaSets.forEach( regionReplicaSet -> { // Clear heartbeat cache along the way env.getConfigManager() .getLoadManager() .removeRegionGroupRelatedCache(regionReplicaSet.getRegionId()); - - if (regionReplicaSet - .getRegionId() - .getType() - .equals(TConsensusGroupType.SchemaRegion)) { - schemaRegionReplicaSets.add(regionReplicaSet); - } else { - regionReplicaSet - .getDataNodeLocations() - .forEach( - targetDataNode -> - dataRegionDeleteTaskOfferPlan.appendRegionMaintainTask( - new RegionDeleteTask( - targetDataNode, regionReplicaSet.getRegionId()))); - } + addChildProcedure(new RemoveRegionGroupProcedure(regionReplicaSet)); }); - - if (!dataRegionDeleteTaskOfferPlan.getRegionMaintainTaskList().isEmpty()) { - // submit async data region delete task - env.getConfigManager().getConsensusManager().write(dataRegionDeleteTaskOfferPlan); - } - - // try sync delete schemaengine region - final DataNodeAsyncRequestContext asyncClientHandler = - new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.DELETE_REGION); - final Map schemaRegionDeleteTaskMap = new HashMap<>(); - int requestIndex = 0; - for (final TRegionReplicaSet schemaRegionReplicaSet : schemaRegionReplicaSets) { - for (final TDataNodeLocation dataNodeLocation : - schemaRegionReplicaSet.getDataNodeLocations()) { - asyncClientHandler.putRequest(requestIndex, schemaRegionReplicaSet.getRegionId()); - asyncClientHandler.putNodeLocation(requestIndex, dataNodeLocation); - schemaRegionDeleteTaskMap.put( - requestIndex, - new RegionDeleteTask(dataNodeLocation, schemaRegionReplicaSet.getRegionId())); - requestIndex++; - } - } - if (!schemaRegionDeleteTaskMap.isEmpty()) { - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithRetry(asyncClientHandler); - for (final Map.Entry entry : - asyncClientHandler.getResponseMap().entrySet()) { - if (entry.getValue().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOG.info( - "[DeleteDatabaseProcedure] Successfully delete SchemaRegion[{}] on {}", - asyncClientHandler.getRequest(entry.getKey()), - schemaRegionDeleteTaskMap.get(entry.getKey()).getTargetDataNode()); - schemaRegionDeleteTaskMap.remove(entry.getKey()); - } else { - LOG.warn( - "[DeleteDatabaseProcedure] Failed to delete SchemaRegion[{}] on {}. Submit to async deletion.", - asyncClientHandler.getRequest(entry.getKey()), - schemaRegionDeleteTaskMap.get(entry.getKey()).getTargetDataNode()); - } - } - - if (!schemaRegionDeleteTaskMap.isEmpty()) { - // submit async schemaengine region delete task for failed sync execution - final OfferRegionMaintainTasksPlan schemaRegionDeleteTaskOfferPlan = - new OfferRegionMaintainTasksPlan(); - schemaRegionDeleteTaskMap - .values() - .forEach(schemaRegionDeleteTaskOfferPlan::appendRegionMaintainTask); - env.getConfigManager().getConsensusManager().write(schemaRegionDeleteTaskOfferPlan); - } - } - + setNextState(DeleteDatabaseState.DELETE_DATABASE_CONFIG); + break; + case DELETE_DATABASE_CONFIG: env.getConfigManager() .getLoadManager() .clearDataPartitionPolicyTable(deleteDatabaseSchema.getName()); @@ -220,7 +146,7 @@ protected Flow executeFromState(final ConfigNodeProcedureEnv env, final DeleteDa ProcedureMessages.DELETEDATABASEPROCEDURE_DELETE_DATABASESCHEMA_FAILED)); } } - } catch (final ConsensusException | TException | IOException e) { + } catch (final TException | IOException e) { if (isRollbackSupported(state)) { setFailure( new ProcedureException( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionGroupState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionGroupState.java new file mode 100644 index 0000000000000..9b7d99c8a2323 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionGroupState.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.state; + +public enum RemoveRegionGroupState { + DELETE_REGION_REPLICAS, +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java index cde6b2bdd8111..c0ad67fe81190 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java @@ -22,5 +22,8 @@ public enum DeleteDatabaseState { PRE_DELETE_DATABASE, INVALIDATE_CACHE, - DELETE_DATABASE_SCHEMA + DELETE_DATABASE_SCHEMA, + // Delete the DatabasePartitionTable and related config after all region groups have been deleted + // by the RemoveRegionGroupProcedure children spawned in DELETE_DATABASE_SCHEMA. + DELETE_DATABASE_CONFIG } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java index 231b44ad2eef4..34f655dfcffcf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java @@ -43,6 +43,7 @@ import org.apache.iotdb.confignode.procedure.impl.region.NotifyRegionMigrationProcedure; import org.apache.iotdb.confignode.procedure.impl.region.ReconstructRegionProcedure; import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrateProcedure; +import org.apache.iotdb.confignode.procedure.impl.region.RemoveRegionGroupProcedure; import org.apache.iotdb.confignode.procedure.impl.region.RemoveRegionPeerProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.AlterEncodingCompressorProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.AlterLogicalViewProcedure; @@ -145,6 +146,9 @@ public Procedure create(ByteBuffer buffer) throws IOException { case NOTIFY_REGION_MIGRATION_PROCEDURE: procedure = new NotifyRegionMigrationProcedure(); break; + case REMOVE_REGION_GROUP_PROCEDURE: + procedure = new RemoveRegionGroupProcedure(); + break; case ALTER_ENCODING_COMPRESSOR_PROCEDURE: procedure = new AlterEncodingCompressorProcedure(false); break; @@ -463,6 +467,8 @@ public static ProcedureType getProcedureType(final Procedure procedure) { return ProcedureType.RECONSTRUCT_REGION_PROCEDURE; } else if (procedure instanceof NotifyRegionMigrationProcedure) { return ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE; + } else if (procedure instanceof RemoveRegionGroupProcedure) { + return ProcedureType.REMOVE_REGION_GROUP_PROCEDURE; } else if (procedure instanceof CreateTriggerProcedure) { return ProcedureType.CREATE_TRIGGER_PROCEDURE; } else if (procedure instanceof DropTriggerProcedure) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java index 1cd6a46a4dcd1..c6b885bfac2b3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java @@ -41,6 +41,7 @@ public enum ProcedureType { ADD_REGION_PEER_PROCEDURE((short) 204), REMOVE_REGION_PEER_PROCEDURE((short) 205), NOTIFY_REGION_MIGRATION_PROCEDURE((short) 206), + REMOVE_REGION_GROUP_PROCEDURE((short) 207), @TestOnly CREATE_MANY_DATABASES_PROCEDURE((short) 250), diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java index afccb0c0eba12..8c32378cfcb1a 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java @@ -40,6 +40,8 @@ import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; +import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask; +import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; @@ -152,6 +154,31 @@ public void testSnapshot() throws TException, IOException { Assert.assertEquals(partitionInfo, partitionInfo1); } + @Test + public void testLegacyRegionDeleteTasksAreFiltered() throws TException, IOException { + // Region deletion is owned by RemoveRegionGroupProcedure; the RegionMaintainer queue only + // recreates failed replicas. A legacy RegionDeleteTask (offered by an old version and replayed + // from a consensus log, or carried over in a snapshot) must be dropped rather than queued, so + // it cannot block the recreation of that region's other replicas. + + // The offer plan mixes two RegionCreateTasks with one legacy RegionDeleteTask. + partitionInfo.offerRegionMaintainTasks(generateOfferRegionMaintainTasksPlan()); + + // The DELETE task is filtered out at offer time; only the two CREATE tasks remain queued. + List queuedTasks = partitionInfo.getRegionMaintainEntryList(); + Assert.assertEquals(2, queuedTasks.size()); + for (RegionMaintainTask task : queuedTasks) { + Assert.assertEquals(RegionMaintainType.CREATE, task.getType()); + } + + // A snapshot round-trip keeps the CREATE tasks and never resurrects a DELETE task. + Assert.assertTrue(partitionInfo.processTakeSnapshot(snapshotDir)); + PartitionInfo loaded = new PartitionInfo(); + loaded.processLoadSnapshot(snapshotDir); + Assert.assertEquals(partitionInfo, loaded); + Assert.assertEquals(2, loaded.getRegionMaintainEntryList().size()); + } + @Test public void testGetRegionType() { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionGroupProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionGroupProcedureTest.java new file mode 100644 index 0000000000000..3ca6efaf0375a --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionGroupProcedureTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; + +import org.apache.tsfile.utils.PublicBAOS; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; + +public class RemoveRegionGroupProcedureTest { + @Test + public void serDeTest() throws Exception { + final TRegionReplicaSet regionReplicaSet = + new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, 10), + Arrays.asList( + new TDataNodeLocation( + 1, + new TEndPoint("127.0.0.1", 0), + new TEndPoint("127.0.0.1", 1), + new TEndPoint("127.0.0.1", 2), + new TEndPoint("127.0.0.1", 3), + new TEndPoint("127.0.0.1", 4)), + new TDataNodeLocation( + 2, + new TEndPoint("127.0.0.1", 10), + new TEndPoint("127.0.0.1", 11), + new TEndPoint("127.0.0.1", 12), + new TEndPoint("127.0.0.1", 13), + new TEndPoint("127.0.0.1", 14)))); + final RemoveRegionGroupProcedure procedure = new RemoveRegionGroupProcedure(regionReplicaSet); + // A non-zero cursor so the round-trip actually exercises currentReplicaIndex (de)serialization; + // equals/hashCode include it, so a dropped/garbled cursor would fail the assertion. + procedure.setCurrentReplicaIndex(1); + try (PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + procedure.serialize(outputStream); + ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + // Exercises ProcedureType.REMOVE_REGION_GROUP_PROCEDURE + ProcedureFactory registration as + // well as the procedure's own serialize/deserialize. + Assert.assertEquals(procedure, ProcedureFactory.getInstance().create(buffer)); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java index 39c1a1678cff4..72fc56033b6ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java @@ -500,17 +500,19 @@ public void run() { // deletePeer: remove the peer from the consensus group TSStatus runResult = deletePeer(); if (isFailed(runResult)) { + // A failed delete must NOT fall through to taskSucceed, otherwise the ConfigNode would + // forget the task while the peer/data is still present. taskFail( taskId, tRegionId, originalDataNode, TRegionMigrateFailedType.RemoveConsensusGroupFailed, runResult); + return; } // deleteRegion: delete region data runResult = deleteRegion(); - if (isFailed(runResult)) { taskFail( taskId, @@ -518,6 +520,7 @@ public void run() { originalDataNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult); + return; } taskSucceed(taskId, tRegionId, "DeletePeer"); @@ -537,6 +540,13 @@ private TSStatus deletePeer() { } else { SchemaRegionConsensusImpl.getInstance().deleteLocalPeer(regionId); } + } catch (ConsensusGroupNotExistException e) { + // The peer is already absent (e.g. a retry after a previous attempt removed it, or the + // region group is being deleted outright). Treat it as success and continue to delete data. + taskLogger.info( + "{}, The local peer of region {} does not exist, skip deleting it", + REGION_MIGRATE_PROCESS, + regionId); } catch (ConsensusException e) { String errorMsg = String.format(