Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,16 @@ public final class ProcedureMessages {
public static final String PID_ADDREGION_STATE_FAILED = "[pid{}][AddRegion] state {} failed";
public static final String PID_ADDREGION_SUCCESS_HAS_BEEN_ADDED_TO_DATANODE_PROCEDURE_TOOK =
"[pid{}][AddRegion] success, {} has been added to DataNode {}. Procedure took {} (start at {}).";
public static final String PID_REMOVEREGIONGROUP_STARTED_WILL_BE_DELETED =
"[pid{}][RemoveRegionGroup] started, region group {} will be deleted from DataNodes {}.";
public static final String PID_REMOVEREGIONGROUP_STARTED_REPLICA_WILL_BE_DELETED_FROM_DATANODE =
"[pid{}][RemoveRegionGroup] region {} will be deleted from DataNode {}.";
public static final String PID_REMOVEREGIONGROUP_STATE_FAILED =
"[pid{}][RemoveRegionGroup] state {} failed";
public static final String PID_REMOVEREGIONGROUP_DELETE_REPLICA_FAILED =
"[pid{}][RemoveRegionGroup] failed to delete a replica of region {} (attempt {}/{}), will retry. reason: {}";
public static final String PID_REMOVEREGIONGROUP_SUCCESS_PROCEDURE_TOOK =
"[pid{}][RemoveRegionGroup] success, region group {} has been deleted. Procedure took {} (started at {}).";
public static final String PID_MIGRATEREGION_STARTED_WILL_BE_MIGRATED_FROM_DATANODE_TO =
"[pid{}][MigrateRegion] started, {} will be migrated from DataNode {} to {}.";
public static final String PID_MIGRATEREGION_STATE_COMPLETE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,16 @@ public final class ProcedureMessages {
public static final String PID_ADDREGION_STATE_FAILED = "[pid{}][AddRegion] state {} failed";
public static final String PID_ADDREGION_SUCCESS_HAS_BEEN_ADDED_TO_DATANODE_PROCEDURE_TOOK =
"[pid{}][AddRegion] success, {} has been added to DataNode {}. Procedure took {} (start at {}).";
public static final String PID_REMOVEREGIONGROUP_STARTED_WILL_BE_DELETED =
"[pid{}][RemoveRegionGroup] 开始,region group {} 将从 DataNode {} 上删除。";
public static final String PID_REMOVEREGIONGROUP_STARTED_REPLICA_WILL_BE_DELETED_FROM_DATANODE =
"[pid{}][RemoveRegionGroup] region {} 将从 DataNode {} 上删除。";
public static final String PID_REMOVEREGIONGROUP_STATE_FAILED =
"[pid{}][RemoveRegionGroup] 状态 {} 失败";
public static final String PID_REMOVEREGIONGROUP_DELETE_REPLICA_FAILED =
"[pid{}][RemoveRegionGroup] 删除 region {} 的一个副本失败(第 {}/{} 次尝试),将重试。原因:{}";
public static final String PID_REMOVEREGIONGROUP_SUCCESS_PROCEDURE_TOOK =
"[pid{}][RemoveRegionGroup] 成功,region group {} 已删除。过程耗时 {}(开始于 {})。";
public static final String PID_MIGRATEREGION_STARTED_WILL_BE_MIGRATED_FROM_DATANODE_TO =
"[pid{}][MigrateRegion] started, {} will be migrated from DataNode {} to {}.";
public static final String PID_MIGRATEREGION_STATE_COMPLETE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.PartitionTableAutoCleaner;
import org.apache.iotdb.confignode.procedure.Procedure;
Expand Down Expand Up @@ -317,9 +316,6 @@ public TSStatus deleteDatabases(
}
List<TSStatus> results = new ArrayList<>(procedures.size());
procedures.forEach(procedure -> results.add(waitingProcedureFinished(procedure)));
// Clear the previously deleted regions
final PartitionManager partitionManager = getConfigManager().getPartitionManager();
partitionManager.getRegionMaintainer().submit(partitionManager::maintainRegionReplicas);
if (results.stream()
.allMatch(result -> result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())) {
return StatusUtils.OK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
Expand All @@ -51,7 +50,9 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -107,6 +108,9 @@ protected Flow executeFromState(
case SHUNT_REGION_REPLICAS:
persistPlan = new CreateRegionGroupsPlan();
final OfferRegionMaintainTasksPlan offerPlan = new OfferRegionMaintainTasksPlan();
// RegionGroups that failed to reach a serving quorum are removed via a child
// RemoveRegionGroupProcedure, which deletes every replica that did get created.
final List<RemoveRegionGroupProcedure> removeRegionGroupProcedures = new ArrayList<>();
// Filter those RegionGroups that created successfully
createRegionGroupsPlan
.getRegionGroupMap()
Expand Down Expand Up @@ -154,20 +158,25 @@ protected Flow executeFromState(
.CREATEREGIONGROUPS_FAILED_TO_CREATE_SOME_REPLICAS_OF_REGIONGROUP_BUT_THIS,
regionReplicaSet.getRegionId());
} else {
// The redundant RegionReplicas should be deleted otherwise
// The redundant RegionReplicas (the ones that did get created) should
// be deleted otherwise
final TRegionReplicaSet redundantReplicas =
new TRegionReplicaSet()
.setRegionId(regionReplicaSet.getRegionId());
regionReplicaSet
.getDataNodeLocations()
.forEach(
targetDataNode -> {
if (!failedRegionReplicas
.getDataNodeLocations()
.contains(targetDataNode)) {
RegionDeleteTask deleteTask =
new RegionDeleteTask(
targetDataNode, regionReplicaSet.getRegionId());
offerPlan.appendRegionMaintainTask(deleteTask);
redundantReplicas.addToDataNodeLocations(targetDataNode);
}
});
if (redundantReplicas.getDataNodeLocationsSize() > 0) {
removeRegionGroupProcedures.add(
new RemoveRegionGroupProcedure(redundantReplicas));
}

LOGGER.info(
ProcedureMessages
Expand All @@ -188,6 +197,7 @@ protected Flow executeFromState(
LOGGER.warn(
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e);
}
removeRegionGroupProcedures.forEach(this::addChildProcedure);
setNextState(CreateRegionGroupsState.REBALANCE_DATA_PARTITION_POLICY);
break;
case REBALANCE_DATA_PARTITION_POLICY:
Expand Down
Loading
Loading