From f7594170871bfd05b8d9cae64ccd09bcfe0483a2 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 26 Jun 2026 14:14:29 +0800 Subject: [PATCH] Pipe: Retry history LoadTsFile while waiting for schema (#18031) * Fix pipe history LoadTsFile schema retry * Address load tsfile schema retry review * Fix load tsfile tests with real temp files (cherry picked from commit 1811e13f6405a1ea2a38ad856bfa32c92ffdd78a) --- .../LoadAnalyzeMissingSchemaException.java | 27 +++++++ .../visitor/PipeStatementTSStatusVisitor.java | 4 +- .../plan/analyze/load/LoadTsFileAnalyzer.java | 80 +++++++++++++++++-- .../PipeStatementTsStatusVisitorTest.java | 32 ++++++++ 4 files changed, 133 insertions(+), 10 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadAnalyzeMissingSchemaException.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadAnalyzeMissingSchemaException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadAnalyzeMissingSchemaException.java new file mode 100644 index 0000000000000..42da0e818a9fb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadAnalyzeMissingSchemaException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception; + +public class LoadAnalyzeMissingSchemaException extends LoadAnalyzeException { + + public LoadAnalyzeMissingSchemaException(final String message) { + super(message); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index d370bff2798bb..509be0301ccb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -74,9 +74,7 @@ public TSStatus visitNode(final StatementNode node, final TSStatus status) { public TSStatus visitLoadFile( final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() - || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode() - && status.getMessage() != null - && status.getMessage().contains("memory")) { + || status.getCode() == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) .setMessage(status.getMessage()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 5902c342ec504..d47953abe9bbe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.LoadAnalyzeException; +import org.apache.iotdb.db.exception.LoadAnalyzeMissingSchemaException; import org.apache.iotdb.db.exception.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadEmptyFileException; import org.apache.iotdb.db.exception.load.LoadFileException; @@ -214,6 +215,9 @@ public Analysis analyzeFileByFile(Analysis analysis) { // the real result on the conversion will be set in the analysis. return analysis; } catch (Exception e) { + if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) { + return analysis; + } final String exceptionMessage = String.format( "Auto create or verify schema error when executing statement %s. Detail: %s.", @@ -315,6 +319,9 @@ private boolean doAnalyzeFileByFile(Analysis analysis) { "The file %s is not a valid tsfile. Please check the input file.", tsFile.getPath())); } catch (Exception e) { + if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) { + return false; + } final String exceptionMessage = String.format( "Loading file %s failed. Detail: %s", @@ -484,6 +491,10 @@ private Analysis setFailAnalysisForAuthException(Analysis analysis, AuthExceptio private Analysis executeTabletConversionOnException( final Analysis analysis, final LoadAnalyzeException e) { + if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) { + return analysis; + } + if (shouldSkipConversion(e)) { analysis.setFailStatus( new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage())); @@ -517,6 +528,52 @@ private Analysis executeTabletConversionOnException( return analysis; } + private boolean setTemporaryUnavailableStatusIfNecessary( + final Analysis analysis, final Throwable throwable) { + if (isTemporaryUnavailableDueToPipeSchemaNotReady(throwable)) { + setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, throwable); + return true; + } + if (isGeneratedByPipe && LoadTsFileDataTypeConverter.isMemoryPressureException(throwable)) { + analysis.setFinishQueryAfterAnalyze(true); + analysis.setFailStatus(LoadTsFileDataTypeConverter.getMemoryPressureStatus(throwable)); + analysis.setStatement(loadTsFileStatement); + return true; + } + return false; + } + + private void setFailAnalysisForTemporaryUnavailablePipeSchema( + final Analysis analysis, final Throwable throwable) { + final String exceptionMessage = + String.format( + "Pipe generated LoadTsFile is waiting for schema metadata to be transferred. Detail: %s", + throwable.getMessage() == null + ? throwable.getClass().getName() + : throwable.getMessage()); + analysis.setFinishQueryAfterAnalyze(true); + analysis.setFailStatus( + RpcUtils.getStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION, exceptionMessage)); + analysis.setStatement(loadTsFileStatement); + } + + boolean isTemporaryUnavailableDueToPipeSchemaNotReady(final Throwable throwable) { + if (!isGeneratedByPipe + || !isVerifySchema + || IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) { + return false; + } + + Throwable current = throwable; + while (current != null) { + if (current instanceof LoadAnalyzeMissingSchemaException) { + return true; + } + current = current.getCause(); + } + return false; + } + private boolean shouldSkipConversion(LoadAnalyzeException e) { return (e instanceof LoadAnalyzeTypeMismatchException) && !loadTsFileStatement.isConvertOnTypeMismatch(); @@ -545,7 +602,7 @@ public void setCurrentTimeIndex(final ITimeIndex timeIndex) { public void autoCreateAndVerify( TsFileSequenceReader reader, Map> device2TimeSeriesMetadataList) - throws IOException, AuthException, LoadAnalyzeTypeMismatchException { + throws IOException, AuthException, LoadAnalyzeException { for (final Map.Entry> entry : device2TimeSeriesMetadataList.entrySet()) { final IDeviceID device = entry.getKey(); @@ -647,14 +704,14 @@ public void flushAndClearDeviceIsAlignedCacheIfNecessary() throws SemanticExcept schemaCache.clearDeviceIsAlignedCacheIfNecessary(); } - public void flush() throws AuthException, LoadAnalyzeTypeMismatchException { + public void flush() throws AuthException, LoadAnalyzeException { doAutoCreateAndVerify(); schemaCache.clearTimeSeries(); } private void doAutoCreateAndVerify() - throws SemanticException, AuthException, LoadAnalyzeTypeMismatchException { + throws SemanticException, AuthException, LoadAnalyzeException { if (schemaCache.getDevice2TimeSeries().isEmpty()) { return; } @@ -677,6 +734,15 @@ private void doAutoCreateAndVerify() } } catch (AuthException | LoadAnalyzeTypeMismatchException e) { throw e; + } catch (LoadAnalyzeMissingSchemaException e) { + if (isTemporaryUnavailableDueToPipeSchemaNotReady(e)) { + throw e; + } + LOGGER.warn("Auto create or verify schema error.", e); + throw new SemanticException( + String.format( + "Auto create or verify schema error when executing statement %s. Detail: %s.", + loadTsFileStatement, e.getMessage())); } catch (Exception e) { if (e.getCause() instanceof LoadAnalyzeTypeMismatchException && isConvertOnTypeMismatch) { throw (LoadAnalyzeTypeMismatchException) e.getCause(); @@ -863,10 +929,10 @@ private void verifySchema(ISchemaTree schemaTree) .collect(Collectors.toList())); if (iotdbDeviceSchemaInfo == null) { - throw new LoadAnalyzeException( + throw new LoadAnalyzeMissingSchemaException( String.format( "Device %s does not exist in IoTDB and can not be created. " - + "Please check weather auto-create-schema is enabled.", + + "Please check whether auto-create-schema is enabled.", device)); } @@ -889,10 +955,10 @@ private void verifySchema(ISchemaTree schemaTree) final MeasurementSchema tsFileSchema = tsfileTimeseriesSchemas.get(i); final MeasurementSchema iotdbSchema = iotdbTimeseriesSchemas.get(i); if (iotdbSchema == null) { - throw new LoadAnalyzeException( + throw new LoadAnalyzeMissingSchemaException( String.format( "Measurement %s does not exist in IoTDB and can not be created. " - + "Please check weather auto-create-schema is enabled.", + + "Please check whether auto-create-schema is enabled.", device + TsFileConstant.PATH_SEPARATOR + tsfileTimeseriesSchemas.get(i))); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java index 756d11818251f..f2716d5c1a48a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java @@ -24,12 +24,14 @@ import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement; import org.apache.iotdb.rpc.TSStatusCode; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.util.Arrays; import java.util.Collections; @@ -64,6 +66,36 @@ StatusUtils.OK, new TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode())))) .getCode()); } + @Test + public void testLoadTemporaryUnavailableClassification() throws Exception { + final File tsFile = File.createTempFile("temporary-unavailable", ".tsfile"); + tsFile.deleteOnExit(); + + Assert.assertEquals( + TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(), + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR + .process( + LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), + new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage("schema is not ready")) + .getCode()); + } + + @Test + public void testLoadFileErrorWithMemoryMessageIsNotClassifiedByMessage() throws Exception { + final File tsFile = File.createTempFile("memory-error", ".tsfile"); + tsFile.deleteOnExit(); + + Assert.assertEquals( + TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR + .process( + LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), + new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()) + .setMessage("memory pressure")) + .getCode()); + } + @Test public void testDatabaseNotExistRuntimeExceptionClassification() { Assert.assertEquals(