diff --git a/java/common/src/main/resources/org/apache/tsfile/i18n/messages.properties b/java/common/src/main/resources/org/apache/tsfile/i18n/messages.properties index a4c34dde1..de4557a89 100644 --- a/java/common/src/main/resources/org/apache/tsfile/i18n/messages.properties +++ b/java/common/src/main/resources/org/apache/tsfile/i18n/messages.properties @@ -1548,3 +1548,72 @@ log.tools.csv_read_error = Error reading CSV file: %1$s # CsvSourceReader — error closing CSV reader log.tools.csv_close_reader_error = Error closing CSV reader + +# HybridCsvTsFileAssembler — could not delete existing output +log.tools.hybrid_delete_output_failed = Could not delete existing output file: {} + +# HybridCsvTsFileAssembler — writing main CSV +log.tools.hybrid_writing_main_csv = Writing main CSV: {} + +# HybridCsvTsFileAssembler — writing supplement CSV +log.tools.hybrid_writing_supplement_csv = Writing supplement CSV: {} (batch_id={}, starting id={}) + +# HybridCsvTsFileAssembler — too many supplement rows +error.tools.hybrid_supplement_too_many_rows = Too many rows in one supplement CSV: %1$s + +# HybridImportConfigParser — supplement_batch_id order +error.tools.hybrid_supplement_batch_id_order = supplement_batch_id must follow supplement_csv in config file + +# HybridImportConfigParser — unknown config line +error.tools.hybrid_unknown_config_line = Unknown config line: %1$s + +# HybridImportConfigParser — supplement_csv without batch id +error.tools.hybrid_supplement_csv_without_batch_id = supplement_csv without matching supplement_batch_id: %1$s + +# HybridImportConfigParser — required output_tsfile +error.tools.hybrid_output_tsfile_required = output_tsfile is required + +# HybridImportConfigParser — required shared_schema +error.tools.hybrid_shared_schema_required = shared_schema is required + +# HybridImportConfigParser — required main_csv +error.tools.hybrid_main_csv_required = main_csv is required + +# HybridImportConfigParser — main_csv not found +error.tools.hybrid_main_csv_not_found = main_csv file not found: %1$s + +# HybridImportConfigParser — shared_schema not found +error.tools.hybrid_shared_schema_not_found = shared_schema file not found: %1$s + +# HybridImportConfigParser — supplement_csv not found +error.tools.hybrid_supplement_csv_not_found = supplement_csv file not found: %1$s + +# HybridImportConfigParser — invalid config line +error.tools.hybrid_invalid_config_line = Invalid config line: %1$s + +# SyntheticTabletBuilder — timestamps length mismatch +error.tools.hybrid_timestamps_length_mismatch = timestamps length %1$s != row count %2$s + +# SyntheticTabletBuilder — uniform TAG violation +error.tools.hybrid_uniform_tags_violation = Supplement CSV must have a single business TAG combination per file when validate_uniform_tags is enabled. Rows %1$s and %2$s map to different devices: %3$s vs %4$s + +# SupplementCsvSourceReader — inferSchema unsupported +error.tools.hybrid_infer_schema_unsupported = inferSchema() is not supported for supplement CSV reader + +# SupplementCsvSourceReader — header required +error.tools.hybrid_supplement_header_required = Supplement CSV requires has_header=true in schema: %1$s + +# SupplementCsvSourceReader — unexpected header column +error.tools.hybrid_supplement_unexpected_column = Unexpected column in supplement CSV header: %1$s + +# SupplementCsvSourceReader — missing header column +error.tools.hybrid_supplement_missing_column = Missing column in supplement CSV header: %1$s + +# SupplementVarianceSorter — FIELD column not in batch +error.tools.hybrid_field_column_not_in_batch = FIELD column '%1$s' not found in supplement batch columns + +# SupplementVarianceSorter — variance sort priority log +log.tools.hybrid_variance_sort_priority = Supplement FIELD sort priority (variance desc): {} + +# TsFileTool — hybrid import failed +log.tools.hybrid_import_failed = Hybrid import failed for config: %1$s diff --git a/java/common/src/main/resources/org/apache/tsfile/i18n/messages_zh.properties b/java/common/src/main/resources/org/apache/tsfile/i18n/messages_zh.properties index a1a437cfb..da9c3bcdf 100644 --- a/java/common/src/main/resources/org/apache/tsfile/i18n/messages_zh.properties +++ b/java/common/src/main/resources/org/apache/tsfile/i18n/messages_zh.properties @@ -1548,3 +1548,72 @@ log.tools.csv_read_error = 读取 CSV 文件出错: %1$s # CsvSourceReader — error closing CSV reader log.tools.csv_close_reader_error = 关闭 CSV reader 出错 + +# HybridCsvTsFileAssembler — could not delete existing output +log.tools.hybrid_delete_output_failed = 无法删除已存在的输出文件: {} + +# HybridCsvTsFileAssembler — writing main CSV +log.tools.hybrid_writing_main_csv = 正在写入主 CSV: {} + +# HybridCsvTsFileAssembler — writing supplement CSV +log.tools.hybrid_writing_supplement_csv = 正在写入附属 CSV: {} (batch_id={}, starting id={}) + +# HybridCsvTsFileAssembler — too many supplement rows +error.tools.hybrid_supplement_too_many_rows = 单个附属 CSV 行数过多: %1$s + +# HybridImportConfigParser — supplement_batch_id order +error.tools.hybrid_supplement_batch_id_order = supplement_batch_id 必须紧跟在 supplement_csv 之后 + +# HybridImportConfigParser — unknown config line +error.tools.hybrid_unknown_config_line = 未知配置行: %1$s + +# HybridImportConfigParser — supplement_csv without batch id +error.tools.hybrid_supplement_csv_without_batch_id = supplement_csv 缺少对应的 supplement_batch_id: %1$s + +# HybridImportConfigParser — required output_tsfile +error.tools.hybrid_output_tsfile_required = 必须配置 output_tsfile + +# HybridImportConfigParser — required shared_schema +error.tools.hybrid_shared_schema_required = 必须配置 shared_schema + +# HybridImportConfigParser — required main_csv +error.tools.hybrid_main_csv_required = 必须配置 main_csv + +# HybridImportConfigParser — main_csv not found +error.tools.hybrid_main_csv_not_found = 找不到 main_csv 文件: %1$s + +# HybridImportConfigParser — shared_schema not found +error.tools.hybrid_shared_schema_not_found = 找不到 shared_schema 文件: %1$s + +# HybridImportConfigParser — supplement_csv not found +error.tools.hybrid_supplement_csv_not_found = 找不到 supplement_csv 文件: %1$s + +# HybridImportConfigParser — invalid config line +error.tools.hybrid_invalid_config_line = 无效配置行: %1$s + +# SyntheticTabletBuilder — timestamps length mismatch +error.tools.hybrid_timestamps_length_mismatch = timestamps 长度 %1$s 与行数 %2$s 不一致 + +# SyntheticTabletBuilder — uniform TAG violation +error.tools.hybrid_uniform_tags_violation = 启用 validate_uniform_tags 时,每个附属 CSV 只能有一种业务 TAG 组合。第 %1$s 行与第 %2$s 行对应不同 device: %3$s vs %4$s + +# SupplementCsvSourceReader — inferSchema unsupported +error.tools.hybrid_infer_schema_unsupported = supplement CSV reader 不支持 inferSchema() + +# SupplementCsvSourceReader — header required +error.tools.hybrid_supplement_header_required = 附属 CSV 的 schema 必须设置 has_header=true: %1$s + +# SupplementCsvSourceReader — unexpected header column +error.tools.hybrid_supplement_unexpected_column = 附属 CSV 表头出现未预期的列: %1$s + +# SupplementCsvSourceReader — missing header column +error.tools.hybrid_supplement_missing_column = 附属 CSV 表头缺少列: %1$s + +# SupplementVarianceSorter — FIELD column not in batch +error.tools.hybrid_field_column_not_in_batch = 附属 batch 中找不到 FIELD 列 '%1$s' + +# SupplementVarianceSorter — variance sort priority log +log.tools.hybrid_variance_sort_priority = 附属 FIELD 排序优先级(方差降序): {} + +# TsFileTool — hybrid import failed +log.tools.hybrid_import_failed = 混合导入失败,配置文件: %1$s diff --git a/java/tools/README-zh.md b/java/tools/README-zh.md index 0efeba2eb..bd746dd49 100644 --- a/java/tools/README-zh.md +++ b/java/tools/README-zh.md @@ -45,6 +45,44 @@ mvn clean package -P with-java -DskipTests mvn install -P with-java -DskipTests ``` +## 混合 CSV 导入 + +将一条带真实时间列的主时序 CSV,与多条不含时间列的附属 CSV(列与主表 TAG/FIELD 相同)合并写入**单个** TsFile。附属行使用合成时间戳 `1, 2, …, N`(按文件内连续);通过虚拟 TAG `batch_id` 隔离,每个附属文件对应一个 ChunkGroup(在同一业务 TAG 组合下)。 + +配置示例(`hybrid.conf`): + +``` +output_tsfile=combined.tsfile +shared_schema=main.schema +main_csv=timeseries.csv +main_batch_id=main +batch_id_tag=batch_id +validate_uniform_tags=true +supplement_sort_by_variance=true +supplement_csv=experiment_1.csv +supplement_batch_id=experiment_1 +supplement_csv=experiment_2.csv +supplement_batch_id=experiment_2 +``` + +运行: + +```sh +java -jar tsfile-tools.jar --hybrid_config hybrid.conf +``` + +附属 CSV 表头须包含 `shared_schema` 中除时间列外的全部业务 TAG 与 FIELD 列(例如 `Region,DeviceId,Temperature,Pressure`)。 + +对每个附属 CSV 单独处理(默认 `supplement_sort_by_variance=true`): + +1. 仅在该 CSV 内计算各 **FIELD** 列方差。 +2. 按方差降序确定列排序优先级。 +3. 对该 CSV 行做升序多键排序。 +4. 写入一个 ChunkGroup;组内时间戳连续(`startId`, `startId+1`, …)。 +5. 下一个附属文件从 `maxId + 1` 继续编号(file1: `1..n1`,file2: `n1+1..n1+n2`,…)。 + +编程接口:`HybridCsvTsFileAssembler.execute(HybridImportConfig)`。 + ## schema 定义 ### 参数 diff --git a/java/tools/README.md b/java/tools/README.md index 8da7af0d9..fd5156483 100644 --- a/java/tools/README.md +++ b/java/tools/README.md @@ -44,6 +44,44 @@ mvn clean package -P with-java -DskipTests mvn install -P with-java -DskipTests ``` +## Hybrid CSV Import + +Combine one main time-series CSV (with a real time column) and multiple supplement CSVs (same TAG/FIELD columns, **no** time column) into a **single** TsFile. Supplement rows receive synthetic timestamps `1, 2, …, N` per file; each file is isolated with a virtual TAG `batch_id` (one ChunkGroup per file per business TAG combination). + +Example config (`hybrid.conf`): + +``` +output_tsfile=combined.tsfile +shared_schema=main.schema +main_csv=timeseries.csv +main_batch_id=main +batch_id_tag=batch_id +validate_uniform_tags=true +supplement_sort_by_variance=true +supplement_csv=experiment_1.csv +supplement_batch_id=experiment_1 +supplement_csv=experiment_2.csv +supplement_batch_id=experiment_2 +``` + +Run: + +```sh +java -jar tsfile-tools.jar --hybrid_config hybrid.conf +``` + +Supplement CSV headers must list all business TAG and FIELD columns from `shared_schema`, excluding the time column (e.g. `Region,DeviceId,Temperature,Pressure`). + +For each supplement CSV separately (`supplement_sort_by_variance=true` by default): + +1. Compute variance of each **FIELD** column **within that CSV only**. +2. Order columns by variance descending (higher variance = higher sort priority). +3. Sort rows in that CSV ascending (multi-key comparator). +4. Write one ChunkGroup per CSV; timestamps are **consecutive** inside the group (`startId`, `startId+1`, …). +5. The next supplement CSV continues ids from `maxId + 1` (file1: `1..n1`, file2: `n1+1..n1+n2`, …). + +Programmatic API: `HybridCsvTsFileAssembler.execute(HybridImportConfig)`. + ## Schema Definition ### Parameters diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/HybridCsvTsFileAssembler.java b/java/tools/src/main/java/org/apache/tsfile/tools/HybridCsvTsFileAssembler.java new file mode 100644 index 000000000..327d7608e --- /dev/null +++ b/java/tools/src/main/java/org/apache/tsfile/tools/HybridCsvTsFileAssembler.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tsfile.tools; + +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.i18n.Messages; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +/** + * Writes one TsFile from a main time-series CSV and zero or more supplement CSVs (no time column). + * Each supplement CSV is sorted internally; ids are consecutive within its ChunkGroup and chained + * across supplement files (file2 starts at maxId(file1)+1). + */ +public class HybridCsvTsFileAssembler { + + private static final Logger LOGGER = LoggerFactory.getLogger(HybridCsvTsFileAssembler.class); + + private HybridCsvTsFileAssembler() {} + + /** + * Executes hybrid import according to {@code config}. + * + * @return true if all data was written successfully + */ + public static boolean execute(HybridImportConfig config) throws IOException, WriteProcessException { + ImportSchema baseSchema = ImportSchemaParser.parse(config.getSharedSchemaPath()); + ImportSchema mainSchema = + ImportSchemaUtils.withBatchIdTag( + baseSchema, config.getBatchIdTag(), config.getMainBatchId()); + + File output = config.getOutputFile(); + if (output.getParentFile() != null) { + output.getParentFile().mkdirs(); + } + if (output.exists() && !output.delete()) { + LOGGER.warn( + Messages.get("log.tools.hybrid_delete_output_failed"), output.getAbsolutePath()); + } + + TsFileWriter writer = null; + try { + writer = new TsFileWriter(output); + writer.setGenerateTableSchema(true); + + TabletBuilder mainBuilder = new TabletBuilder(mainSchema, new TimeConverter(mainSchema.getTimePrecision())); + writer.registerTableSchema(mainBuilder.getTableSchema()); + + writeMainCsv(config, mainSchema, mainBuilder, writer); + writeSupplementCsvs(config, baseSchema, writer); + + return true; + } finally { + if (writer != null) { + writer.close(); + } + } + } + + private static void writeMainCsv( + HybridImportConfig config, + ImportSchema mainSchema, + TabletBuilder mainBuilder, + TsFileWriter writer) + throws IOException, WriteProcessException { + LOGGER.info(Messages.get("log.tools.hybrid_writing_main_csv"), config.getMainCsvPath()); + try (CsvSourceReader reader = + new CsvSourceReader( + config.getMainCsvFile(), mainSchema, config.getReadChunkSizeBytes())) { + SourceBatch batch; + while ((batch = reader.readBatch()) != null) { + if (batch.isEmpty()) { + continue; + } + writer.writeTable(mainBuilder.build(batch)); + } + } + } + + private static void writeSupplementCsvs( + HybridImportConfig config, ImportSchema baseSchema, TsFileWriter writer) + throws IOException, WriteProcessException { + long nextSupplementId = 1; + + for (HybridImportConfig.SupplementEntry entry : config.getSupplements()) { + ImportSchema supplementSchema = + ImportSchemaUtils.withBatchIdTag( + baseSchema, config.getBatchIdTag(), entry.getBatchId()); + LOGGER.info( + Messages.get("log.tools.hybrid_writing_supplement_csv"), + entry.getCsvPath(), + entry.getBatchId(), + nextSupplementId); + + try (SupplementCsvSourceReader reader = + new SupplementCsvSourceReader( + entry.getCsvFile(), supplementSchema, config.getReadChunkSizeBytes())) { + SourceBatch batch = SupplementVarianceSorter.readAll(reader); + if (batch.isEmpty()) { + continue; + } + if (config.isSupplementSortByVariance()) { + batch = SupplementVarianceSorter.sortByVariancePriority(batch, baseSchema); + } + + SyntheticTabletBuilder tabletBuilder = + new SyntheticTabletBuilder(supplementSchema, config.isValidateUniformTags()); + long fileRowCount = batch.getRowCount(); + long[] timestamps = buildConsecutiveTimestamps(nextSupplementId, fileRowCount); + writeSupplementBatchInChunks( + batch, timestamps, tabletBuilder, writer, config.getReadChunkSizeBytes()); + nextSupplementId += fileRowCount; + } + } + } + + /** Builds {@code [startId, startId+1, …, startId+rowCount-1]}. */ + static long[] buildConsecutiveTimestamps(long startId, long rowCount) { + if (rowCount > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + Messages.format("error.tools.hybrid_supplement_too_many_rows", rowCount)); + } + int n = (int) rowCount; + long[] timestamps = new long[n]; + for (int i = 0; i < n; i++) { + timestamps[i] = startId + i; + } + return timestamps; + } + + private static void writeSupplementBatchInChunks( + SourceBatch batch, + long[] timestamps, + SyntheticTabletBuilder tabletBuilder, + TsFileWriter writer, + long chunkSizeBytes) + throws IOException, WriteProcessException { + int maxRowsPerChunk = estimateMaxRowsPerChunk(batch, chunkSizeBytes); + if (maxRowsPerChunk <= 0 || batch.getRowCount() <= maxRowsPerChunk) { + writer.writeTable(tabletBuilder.build(batch, timestamps)); + return; + } + for (int start = 0; start < batch.getRowCount(); start += maxRowsPerChunk) { + int end = Math.min(start + maxRowsPerChunk, batch.getRowCount()); + SourceBatch rowSlice = sliceRows(batch, start, end); + long[] timeSlice = Arrays.copyOfRange(timestamps, start, end); + writer.writeTable(tabletBuilder.build(rowSlice, timeSlice)); + } + } + + private static int estimateMaxRowsPerChunk(SourceBatch batch, long chunkSizeBytes) { + if (batch.getRowCount() == 0) { + return 0; + } + long estimatedBytesPerRow = Math.max(32L, (long) batch.getColumnCount() * 32L); + int maxRows = (int) (chunkSizeBytes / estimatedBytesPerRow); + return Math.max(1, Math.min(batch.getRowCount(), maxRows)); + } + + private static SourceBatch sliceRows(SourceBatch batch, int startRowInclusive, int endRowExclusive) { + int rowCount = endRowExclusive - startRowInclusive; + int colCount = batch.getColumnCount(); + Object[][] colData = new Object[colCount][rowCount]; + for (int c = 0; c < colCount; c++) { + Object[] column = batch.getColumn(c); + System.arraycopy(column, startRowInclusive, colData[c], 0, rowCount); + } + return new SourceBatch(batch.getColumnNames(), colData, rowCount); + } +} diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/HybridImportConfig.java b/java/tools/src/main/java/org/apache/tsfile/tools/HybridImportConfig.java new file mode 100644 index 000000000..a0a36f008 --- /dev/null +++ b/java/tools/src/main/java/org/apache/tsfile/tools/HybridImportConfig.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tsfile.tools; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** Configuration for hybrid CSV import (main time-series CSV + supplement CSVs). */ +public class HybridImportConfig { + + private String outputTsfile; + private String sharedSchemaPath; + private String mainCsvPath; + private String mainBatchId = "main"; + private String batchIdTag = ImportSchemaUtils.DEFAULT_BATCH_ID_TAG; + private boolean validateUniformTags = true; + /** Sort supplement rows by FIELD variance priority (ascending multi-key) before write. */ + private boolean supplementSortByVariance = true; + private long readChunkSizeBytes = 256L * 1024 * 1024; + + private final List supplements = new ArrayList<>(); + + public static class SupplementEntry { + private final String csvPath; + private final String batchId; + + public SupplementEntry(String csvPath, String batchId) { + this.csvPath = csvPath; + this.batchId = batchId; + } + + public String getCsvPath() { + return csvPath; + } + + public String getBatchId() { + return batchId; + } + + public File getCsvFile() { + return new File(csvPath); + } + } + + public String getOutputTsfile() { + return outputTsfile; + } + + public void setOutputTsfile(String outputTsfile) { + this.outputTsfile = outputTsfile; + } + + public String getSharedSchemaPath() { + return sharedSchemaPath; + } + + public void setSharedSchemaPath(String sharedSchemaPath) { + this.sharedSchemaPath = sharedSchemaPath; + } + + public String getMainCsvPath() { + return mainCsvPath; + } + + public void setMainCsvPath(String mainCsvPath) { + this.mainCsvPath = mainCsvPath; + } + + public String getMainBatchId() { + return mainBatchId; + } + + public void setMainBatchId(String mainBatchId) { + this.mainBatchId = mainBatchId; + } + + public String getBatchIdTag() { + return batchIdTag; + } + + public void setBatchIdTag(String batchIdTag) { + this.batchIdTag = batchIdTag; + } + + public boolean isValidateUniformTags() { + return validateUniformTags; + } + + public void setValidateUniformTags(boolean validateUniformTags) { + this.validateUniformTags = validateUniformTags; + } + + public boolean isSupplementSortByVariance() { + return supplementSortByVariance; + } + + public void setSupplementSortByVariance(boolean supplementSortByVariance) { + this.supplementSortByVariance = supplementSortByVariance; + } + + public long getReadChunkSizeBytes() { + return readChunkSizeBytes; + } + + public void setReadChunkSizeBytes(long readChunkSizeBytes) { + this.readChunkSizeBytes = readChunkSizeBytes; + } + + public List getSupplements() { + return Collections.unmodifiableList(supplements); + } + + public void addSupplement(String csvPath, String batchId) { + supplements.add(new SupplementEntry(csvPath, batchId)); + } + + public File getOutputFile() { + return new File(outputTsfile); + } + + public File getMainCsvFile() { + return new File(mainCsvPath); + } +} diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/HybridImportConfigParser.java b/java/tools/src/main/java/org/apache/tsfile/tools/HybridImportConfigParser.java new file mode 100644 index 000000000..1f0cd739f --- /dev/null +++ b/java/tools/src/main/java/org/apache/tsfile/tools/HybridImportConfigParser.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tsfile.tools; + +import org.apache.tsfile.i18n.Messages; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; + +/** + * Parses a hybrid import config file (key=value lines). Example: + * + *
+ * output_tsfile=combined.tsfile
+ * shared_schema=main.schema
+ * main_csv=timeseries.csv
+ * main_batch_id=main
+ * batch_id_tag=batch_id
+ * validate_uniform_tags=true
+ * supplement_csv=exp1.csv
+ * supplement_batch_id=exp1
+ * supplement_csv=exp2.csv
+ * supplement_batch_id=exp2
+ * 
+ */ +public class HybridImportConfigParser { + + private HybridImportConfigParser() {} + + public static HybridImportConfig parse(String filePath) throws IOException { + HybridImportConfig config = new HybridImportConfig(); + String pendingSupplementCsv = null; + + try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.isEmpty() || line.startsWith("#") || line.startsWith("//")) { + continue; + } + if (line.startsWith("output_tsfile=")) { + config.setOutputTsfile(extractValue(line)); + } else if (line.startsWith("shared_schema=")) { + config.setSharedSchemaPath(extractValue(line)); + } else if (line.startsWith("main_csv=")) { + config.setMainCsvPath(extractValue(line)); + } else if (line.startsWith("main_batch_id=")) { + config.setMainBatchId(extractValue(line)); + } else if (line.startsWith("batch_id_tag=")) { + config.setBatchIdTag(extractValue(line)); + } else if (line.startsWith("validate_uniform_tags=")) { + config.setValidateUniformTags(Boolean.parseBoolean(extractValue(line))); + } else if (line.startsWith("supplement_sort_by_variance=")) { + config.setSupplementSortByVariance(Boolean.parseBoolean(extractValue(line))); + } else if (line.startsWith("read_chunk_size=")) { + config.setReadChunkSizeBytes(TsFileTool.parseBlockSize(extractValue(line))); + } else if (line.startsWith("supplement_csv=")) { + pendingSupplementCsv = extractValue(line); + } else if (line.startsWith("supplement_batch_id=")) { + if (pendingSupplementCsv == null) { + throw new IllegalArgumentException( + Messages.get("error.tools.hybrid_supplement_batch_id_order")); + } + config.addSupplement(pendingSupplementCsv, extractValue(line)); + pendingSupplementCsv = null; + } else { + throw new IllegalArgumentException( + Messages.format("error.tools.hybrid_unknown_config_line", line)); + } + } + } + + if (pendingSupplementCsv != null) { + throw new IllegalArgumentException( + Messages.format("error.tools.hybrid_supplement_csv_without_batch_id", pendingSupplementCsv)); + } + validate(config); + return config; + } + + private static void validate(HybridImportConfig config) { + if (config.getOutputTsfile() == null || config.getOutputTsfile().isEmpty()) { + throw new IllegalArgumentException(Messages.get("error.tools.hybrid_output_tsfile_required")); + } + if (config.getSharedSchemaPath() == null || config.getSharedSchemaPath().isEmpty()) { + throw new IllegalArgumentException(Messages.get("error.tools.hybrid_shared_schema_required")); + } + if (config.getMainCsvPath() == null || config.getMainCsvPath().isEmpty()) { + throw new IllegalArgumentException(Messages.get("error.tools.hybrid_main_csv_required")); + } + if (!config.getMainCsvFile().exists()) { + throw new IllegalArgumentException( + Messages.format("error.tools.hybrid_main_csv_not_found", config.getMainCsvPath())); + } + if (!new java.io.File(config.getSharedSchemaPath()).exists()) { + throw new IllegalArgumentException( + Messages.format( + "error.tools.hybrid_shared_schema_not_found", config.getSharedSchemaPath())); + } + for (HybridImportConfig.SupplementEntry entry : config.getSupplements()) { + if (!entry.getCsvFile().exists()) { + throw new IllegalArgumentException( + Messages.format("error.tools.hybrid_supplement_csv_not_found", entry.getCsvPath())); + } + } + } + + private static String extractValue(String line) { + int idx = line.indexOf('='); + if (idx < 0 || idx == line.length() - 1) { + throw new IllegalArgumentException( + Messages.format("error.tools.hybrid_invalid_config_line", line)); + } + return line.substring(idx + 1).trim(); + } +} diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/ImportSchemaUtils.java b/java/tools/src/main/java/org/apache/tsfile/tools/ImportSchemaUtils.java new file mode 100644 index 000000000..32ef47607 --- /dev/null +++ b/java/tools/src/main/java/org/apache/tsfile/tools/ImportSchemaUtils.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tsfile.tools; + +import java.util.ArrayList; +import java.util.List; + +/** Utilities for preparing {@link ImportSchema} instances used in hybrid CSV import. */ +public final class ImportSchemaUtils { + + public static final String DEFAULT_BATCH_ID_TAG = "batch_id"; + + private ImportSchemaUtils() {} + + /** + * Returns a copy of {@code base} with a virtual tag column for batch identification. The tag is + * filled on every row via the default value (see {@link TabletBuilder}). + */ + public static ImportSchema withBatchIdTag( + ImportSchema base, String batchIdTagName, String batchIdValue) { + ImportSchema copy = copyOf(base); + List tags = new ArrayList<>(copy.getTagColumns()); + boolean replaced = false; + for (int i = 0; i < tags.size(); i++) { + if (tags.get(i).getName().equals(batchIdTagName)) { + tags.set(i, new ImportSchema.TagColumn(batchIdTagName, batchIdValue)); + replaced = true; + break; + } + } + if (!replaced) { + tags.add(new ImportSchema.TagColumn(batchIdTagName, batchIdValue)); + } + copy.setTagColumns(tags); + return copy; + } + + /** Source columns present in supplement CSV files (time column excluded). */ + public static List supplementSourceColumns(ImportSchema schema) { + List result = new ArrayList<>(); + String timeName = schema.getTimeColumnName(); + for (ImportSchema.SourceColumn col : schema.getSourceColumns()) { + if (col.isSkip()) { + continue; + } + if (col.getName().equals(timeName)) { + continue; + } + result.add(col); + } + return result; + } + + public static ImportSchema copyOf(ImportSchema base) { + ImportSchema copy = new ImportSchema(); + copy.setTableName(base.getTableName()); + copy.setTimePrecision(base.getTimePrecision()); + copy.setHasHeader(base.isHasHeader()); + copy.setSeparator(base.getSeparator()); + copy.setNullFormat(base.getNullFormat()); + copy.setTimeColumnName(base.getTimeColumnName()); + copy.setTagColumns(new ArrayList<>(base.getTagColumns())); + copy.setSourceColumns(new ArrayList<>(base.getSourceColumns())); + return copy; + } +} diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/SourceBatch.java b/java/tools/src/main/java/org/apache/tsfile/tools/SourceBatch.java index 18d01b2bb..bfa5fb170 100644 --- a/java/tools/src/main/java/org/apache/tsfile/tools/SourceBatch.java +++ b/java/tools/src/main/java/org/apache/tsfile/tools/SourceBatch.java @@ -75,6 +75,42 @@ public boolean isEmpty() { return rowCount == 0; } + /** Concatenates multiple batches with identical column names into one batch. */ + public static SourceBatch concat(List batches) { + if (batches == null || batches.isEmpty()) { + return new SourceBatch(new String[0], new Object[0][0], 0); + } + SourceBatch first = batches.get(0); + if (batches.size() == 1) { + return first; + } + int totalRows = 0; + for (SourceBatch batch : batches) { + totalRows += batch.getRowCount(); + } + int colCount = first.getColumnCount(); + String[] names = first.getColumnNames(); + Object[][] colData = new Object[colCount][totalRows]; + int offset = 0; + for (SourceBatch batch : batches) { + if (batch.getColumnCount() != colCount) { + throw new IllegalArgumentException("Cannot concat batches with different column counts"); + } + for (int c = 0; c < colCount; c++) { + if (!names[c].equals(batch.getColumnName(c))) { + throw new IllegalArgumentException( + "Cannot concat batches with different column names: " + + names[c] + + " vs " + + batch.getColumnName(c)); + } + System.arraycopy(batch.getColumn(c), 0, colData[c], offset, batch.getRowCount()); + } + offset += batch.getRowCount(); + } + return new SourceBatch(names, colData, totalRows); + } + @Override public String toString() { return "SourceBatch{columns=" + Arrays.toString(columnNames) + ", rows=" + rowCount + '}'; diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/SupplementCsvSourceReader.java b/java/tools/src/main/java/org/apache/tsfile/tools/SupplementCsvSourceReader.java new file mode 100644 index 000000000..e4ebd951b --- /dev/null +++ b/java/tools/src/main/java/org/apache/tsfile/tools/SupplementCsvSourceReader.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tsfile.tools; + +import org.apache.tsfile.i18n.Messages; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Reads supplement CSV files whose columns match {@link ImportSchemaUtils#supplementSourceColumns} + * (no time column). + */ +public class SupplementCsvSourceReader implements SourceReader { + + private static final Logger LOGGER = LoggerFactory.getLogger(SupplementCsvSourceReader.class); + private static final long DEFAULT_CHUNK_SIZE = 256L * 1024 * 1024; + + private final File sourceFile; + private final ImportSchema schema; + private final long chunkSizeBytes; + private final String separator; + private final List supplementColumns; + + /** Maps CSV file column index to index in {@link #supplementColumns}. */ + private final int[] fileColumnToSupplementIndex; + + private BufferedReader reader; + private boolean headerConsumed; + private boolean exhausted; + + public SupplementCsvSourceReader(File sourceFile, ImportSchema schema) { + this(sourceFile, schema, DEFAULT_CHUNK_SIZE); + } + + public SupplementCsvSourceReader(File sourceFile, ImportSchema schema, long chunkSizeBytes) { + this.sourceFile = sourceFile; + this.schema = schema; + this.chunkSizeBytes = chunkSizeBytes; + this.separator = schema.getSeparator(); + this.supplementColumns = ImportSchemaUtils.supplementSourceColumns(schema); + this.fileColumnToSupplementIndex = new int[supplementColumns.size()]; + for (int i = 0; i < fileColumnToSupplementIndex.length; i++) { + fileColumnToSupplementIndex[i] = -1; + } + this.headerConsumed = false; + this.exhausted = false; + } + + @Override + public ImportSchema inferSchema() { + throw new UnsupportedOperationException( + Messages.get("error.tools.hybrid_infer_schema_unsupported")); + } + + @Override + public SourceBatch readBatch() { + if (exhausted) { + return null; + } + + try { + ensureReaderOpen(); + + if (schema.isHasHeader() && !headerConsumed) { + String headerLine = reader.readLine(); + if (headerLine == null) { + exhausted = true; + return null; + } + parseHeader(splitLine(headerLine)); + headerConsumed = true; + } else if (!headerConsumed) { + throw new IllegalArgumentException( + Messages.format( + "error.tools.hybrid_supplement_header_required", sourceFile.getAbsolutePath())); + } + + List rows = new ArrayList<>(); + long currentSize = 0; + String line; + while ((line = reader.readLine()) != null) { + byte[] lineBytes = line.getBytes(StandardCharsets.UTF_8); + long lineSize = lineBytes.length; + + if (currentSize > 0 && currentSize + lineSize > chunkSizeBytes) { + rows.add(parseLine(splitLine(line))); + return buildBatch(rows); + } + + rows.add(parseLine(splitLine(line))); + currentSize += lineSize; + } + exhausted = true; + + if (rows.isEmpty()) { + return null; + } + return buildBatch(rows); + + } catch (IOException e) { + LOGGER.error(Messages.format("log.tools.csv_read_error", sourceFile.getAbsolutePath()), e); + exhausted = true; + return null; + } + } + + @Override + public void close() { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + LOGGER.error(Messages.get("log.tools.csv_close_reader_error"), e); + } + reader = null; + } + } + + private void parseHeader(String[] headerNames) { + if (headerNames.length != supplementColumns.size()) { + throw new IllegalArgumentException( + Messages.format( + "error.tools.csv_column_count_mismatch", + supplementColumns.size(), + headerNames.length, + sourceFile.getAbsolutePath())); + } + for (int fileCol = 0; fileCol < headerNames.length; fileCol++) { + String name = headerNames[fileCol].trim(); + int supplementIdx = -1; + for (int j = 0; j < supplementColumns.size(); j++) { + if (supplementColumns.get(j).getName().equals(name)) { + supplementIdx = j; + break; + } + } + if (supplementIdx < 0) { + throw new IllegalArgumentException( + Messages.format("error.tools.hybrid_supplement_unexpected_column", name)); + } + fileColumnToSupplementIndex[fileCol] = supplementIdx; + } + boolean[] seen = new boolean[supplementColumns.size()]; + for (int mapped : fileColumnToSupplementIndex) { + if (mapped >= 0) { + seen[mapped] = true; + } + } + for (int j = 0; j < supplementColumns.size(); j++) { + if (!seen[j]) { + throw new IllegalArgumentException( + Messages.format( + "error.tools.hybrid_supplement_missing_column", + supplementColumns.get(j).getName())); + } + } + } + + private void ensureReaderOpen() throws IOException { + if (reader == null) { + reader = + new BufferedReader( + new InputStreamReader( + Files.newInputStream(sourceFile.toPath()), StandardCharsets.UTF_8)); + } + } + + private String[] splitLine(String line) { + return line.split(separator, -1); + } + + private Object[] parseLine(String[] parts) { + if (parts.length != supplementColumns.size()) { + throw new IllegalArgumentException( + Messages.format( + "error.tools.csv_column_count_mismatch", + supplementColumns.size(), + parts.length, + sourceFile.getAbsolutePath())); + } + Object[] row = new Object[supplementColumns.size()]; + for (int fileCol = 0; fileCol < parts.length; fileCol++) { + int supplementIdx = fileColumnToSupplementIndex[fileCol]; + String val = parts[fileCol]; + String nullFormat = schema.getNullFormat(); + if (val.isEmpty() || (nullFormat != null && nullFormat.equals(val))) { + row[supplementIdx] = null; + } else { + row[supplementIdx] = val; + } + } + return row; + } + + private SourceBatch buildBatch(List rows) { + String[] names = new String[supplementColumns.size()]; + for (int i = 0; i < supplementColumns.size(); i++) { + names[i] = supplementColumns.get(i).getName(); + } + Object[][] colData = new Object[names.length][rows.size()]; + for (int r = 0; r < rows.size(); r++) { + Object[] row = rows.get(r); + for (int c = 0; c < names.length; c++) { + colData[c][r] = row[c]; + } + } + return new SourceBatch(names, colData, rows.size()); + } +} diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/SupplementVarianceSorter.java b/java/tools/src/main/java/org/apache/tsfile/tools/SupplementVarianceSorter.java new file mode 100644 index 000000000..bd9db70b5 --- /dev/null +++ b/java/tools/src/main/java/org/apache/tsfile/tools/SupplementVarianceSorter.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tsfile.tools; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.i18n.Messages; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +/** + * Pre-processes a single supplement CSV: compute per-FIELD-column variance within that file, + * order columns by variance descending (higher variance = higher priority), then sort its rows + * ascending with a multi-key comparator ({@link Arrays#sort}). + */ +public final class SupplementVarianceSorter { + + private static final Logger LOGGER = LoggerFactory.getLogger(SupplementVarianceSorter.class); + + private SupplementVarianceSorter() {} + + /** + * Returns a new {@link SourceBatch} with rows sorted according to FIELD column variance priority. + * Variance is computed only from rows in {@code batch} (single supplement CSV). + */ + public static SourceBatch sortByVariancePriority(SourceBatch batch, ImportSchema schema) { + if (batch == null || batch.isEmpty()) { + return batch; + } + + List fieldColumns = schema.fieldColumns(); + if (fieldColumns.isEmpty()) { + return batch; + } + + int[] batchColumnIndexByField = resolveBatchColumnIndices(batch, fieldColumns); + double[] variances = computeVariances(batch, fieldColumns, batchColumnIndexByField); + int[] priorityFieldOrder = sortFieldIndicesByVarianceDesc(variances); + + logPriority(fieldColumns, variances, priorityFieldOrder); + + Integer[] rowOrder = new Integer[batch.getRowCount()]; + for (int i = 0; i < rowOrder.length; i++) { + rowOrder[i] = i; + } + + Arrays.sort( + rowOrder, + new RowComparator(batch, fieldColumns, batchColumnIndexByField, priorityFieldOrder)); + + return reorderRows(batch, rowOrder); + } + + private static int[] resolveBatchColumnIndices( + SourceBatch batch, List fieldColumns) { + int[] indices = new int[fieldColumns.size()]; + for (int f = 0; f < fieldColumns.size(); f++) { + String name = fieldColumns.get(f).getName(); + int batchIdx = -1; + for (int c = 0; c < batch.getColumnCount(); c++) { + if (name.equals(batch.getColumnName(c))) { + batchIdx = c; + break; + } + } + if (batchIdx < 0) { + throw new IllegalArgumentException( + Messages.format("error.tools.hybrid_field_column_not_in_batch", name)); + } + indices[f] = batchIdx; + } + return indices; + } + + private static double[] computeVariances( + SourceBatch batch, + List fieldColumns, + int[] batchColumnIndexByField) { + int fieldCount = fieldColumns.size(); + double[] variances = new double[fieldCount]; + int rowCount = batch.getRowCount(); + + for (int f = 0; f < fieldCount; f++) { + int batchCol = batchColumnIndexByField[f]; + TSDataType type = fieldColumns.get(f).getDataType(); + if (!isNumericType(type)) { + variances[f] = 0.0; + continue; + } + + double sum = 0.0; + int count = 0; + for (int r = 0; r < rowCount; r++) { + Double v = toDouble(batch.getValue(r, batchCol), type); + if (v != null) { + sum += v; + count++; + } + } + if (count < 2) { + variances[f] = 0.0; + continue; + } + double mean = sum / count; + double sumSq = 0.0; + for (int r = 0; r < rowCount; r++) { + Double v = toDouble(batch.getValue(r, batchCol), type); + if (v != null) { + double d = v - mean; + sumSq += d * d; + } + } + variances[f] = sumSq / count; + } + return variances; + } + + private static int[] sortFieldIndicesByVarianceDesc(double[] variances) { + Integer[] order = new Integer[variances.length]; + for (int i = 0; i < order.length; i++) { + order[i] = i; + } + Arrays.sort( + order, + (a, b) -> { + int cmp = Double.compare(variances[b], variances[a]); + if (cmp != 0) { + return cmp; + } + return Integer.compare(a, b); + }); + int[] result = new int[order.length]; + for (int i = 0; i < order.length; i++) { + result[i] = order[i]; + } + return result; + } + + private static SourceBatch reorderRows(SourceBatch batch, Integer[] rowOrder) { + int rowCount = rowOrder.length; + int colCount = batch.getColumnCount(); + Object[][] newColData = new Object[colCount][rowCount]; + for (int newRow = 0; newRow < rowCount; newRow++) { + int oldRow = rowOrder[newRow]; + for (int c = 0; c < colCount; c++) { + newColData[c][newRow] = batch.getValue(oldRow, c); + } + } + return new SourceBatch(batch.getColumnNames(), newColData, rowCount); + } + + private static void logPriority( + List fieldColumns, + double[] variances, + int[] priorityFieldOrder) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < priorityFieldOrder.length; i++) { + int fieldIdx = priorityFieldOrder[i]; + if (i > 0) { + sb.append(", "); + } + sb.append(fieldColumns.get(fieldIdx).getName()) + .append("(variance=") + .append(variances[fieldIdx]) + .append(")"); + } + LOGGER.info(Messages.get("log.tools.hybrid_variance_sort_priority"), sb.toString()); + } + + private static boolean isNumericType(TSDataType type) { + switch (type) { + case BOOLEAN: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case TIMESTAMP: + case DATE: + return true; + default: + return false; + } + } + + private static Double toDouble(Object value, TSDataType type) { + if (isNullValue(value)) { + return null; + } + try { + switch (type) { + case BOOLEAN: + if (value instanceof Boolean) { + return ((Boolean) value) ? 1.0 : 0.0; + } + return Boolean.parseBoolean(value.toString()) ? 1.0 : 0.0; + case INT32: + case DATE: + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + return Double.parseDouble(value.toString()); + case INT64: + case TIMESTAMP: + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + return Double.parseDouble(value.toString()); + case FLOAT: + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + return Double.parseDouble(value.toString()); + case DOUBLE: + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + return Double.parseDouble(value.toString()); + default: + return null; + } + } catch (NumberFormatException e) { + return null; + } + } + + private static boolean isNullValue(Object value) { + if (value == null) { + return true; + } + if (value instanceof String) { + return ((String) value).isEmpty(); + } + return false; + } + + private static final class RowComparator implements Comparator { + + private final SourceBatch batch; + private final List fieldColumns; + private final int[] batchColumnIndexByField; + private final int[] priorityFieldOrder; + + private RowComparator( + SourceBatch batch, + List fieldColumns, + int[] batchColumnIndexByField, + int[] priorityFieldOrder) { + this.batch = batch; + this.fieldColumns = fieldColumns; + this.batchColumnIndexByField = batchColumnIndexByField; + this.priorityFieldOrder = priorityFieldOrder; + } + + @Override + public int compare(Integer rowA, Integer rowB) { + for (int fieldIdx : priorityFieldOrder) { + int batchCol = batchColumnIndexByField[fieldIdx]; + TSDataType type = fieldColumns.get(fieldIdx).getDataType(); + int cmp = compareCell(batch, rowA, rowB, batchCol, type); + if (cmp != 0) { + return cmp; + } + } + return Integer.compare(rowA, rowB); + } + + private int compareCell( + SourceBatch batch, int rowA, int rowB, int batchCol, TSDataType type) { + Object va = batch.getValue(rowA, batchCol); + Object vb = batch.getValue(rowB, batchCol); + boolean na = isNullValue(va); + boolean nb = isNullValue(vb); + if (na && nb) { + return 0; + } + if (na) { + return 1; + } + if (nb) { + return -1; + } + if (isNumericType(type)) { + Double da = toDouble(va, type); + Double db = toDouble(vb, type); + if (da == null && db == null) { + return 0; + } + if (da == null) { + return 1; + } + if (db == null) { + return -1; + } + return Double.compare(da, db); + } + return va.toString().compareTo(vb.toString()); + } + } + + /** Reads all batches from {@code reader} into one {@link SourceBatch}. */ + public static SourceBatch readAll(SupplementCsvSourceReader reader) { + List parts = new ArrayList<>(); + SourceBatch batch; + while ((batch = reader.readBatch()) != null) { + if (!batch.isEmpty()) { + parts.add(batch); + } + } + return SourceBatch.concat(parts); + } +} diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/SyntheticTabletBuilder.java b/java/tools/src/main/java/org/apache/tsfile/tools/SyntheticTabletBuilder.java new file mode 100644 index 000000000..f16c144c7 --- /dev/null +++ b/java/tools/src/main/java/org/apache/tsfile/tools/SyntheticTabletBuilder.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tsfile.tools; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.i18n.Messages; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Builds {@link Tablet} instances from supplement CSV batches that have no time column. Timestamps + * are assigned as {@code timeOffset + rowIndex + 1}. + */ +public class SyntheticTabletBuilder { + + private final ImportSchema importSchema; + private final TableSchema tableSchema; + private final Map tagDefaults; + private final Map sourceColumnIndex; + private final boolean validateUniformTags; + private long timeOffset; + + public SyntheticTabletBuilder(ImportSchema importSchema, boolean validateUniformTags) { + this.importSchema = importSchema; + this.validateUniformTags = validateUniformTags; + this.tagDefaults = new HashMap<>(); + this.sourceColumnIndex = new HashMap<>(); + this.tableSchema = buildTableSchema(); + buildSourceColumnIndex(); + } + + public TableSchema getTableSchema() { + return tableSchema; + } + + public void setTimeOffset(long timeOffset) { + this.timeOffset = timeOffset; + } + + public long getTimeOffset() { + return timeOffset; + } + + public Tablet build(SourceBatch batch) { + int rowCount = batch.getRowCount(); + long[] timestamps = new long[rowCount]; + for (int i = 0; i < rowCount; i++) { + timestamps[i] = timeOffset + i + 1; + } + return build(batch, timestamps); + } + + /** + * Builds a tablet using explicit per-row timestamps (e.g. global supplement ids). + * + * @param timestamps length must equal {@code batch.getRowCount()} + */ + public Tablet build(SourceBatch batch, long[] timestamps) { + int rowCount = batch.getRowCount(); + if (timestamps.length != rowCount) { + throw new IllegalArgumentException( + Messages.format( + "error.tools.hybrid_timestamps_length_mismatch", + timestamps.length, + rowCount)); + } + Tablet tablet = + new Tablet( + tableSchema.getTableName(), + IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()), + IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()), + tableSchema.getColumnTypes(), + rowCount); + + for (int i = 0; i < rowCount; i++) { + tablet.addTimestamp(i, timestamps[i]); + fillRow(tablet, batch, i); + } + tablet.setRowSize(rowCount); + + if (validateUniformTags && rowCount > 0) { + validateSingleDevice(tablet); + } + return tablet; + } + + private void fillRow(Tablet tablet, SourceBatch batch, int outputRow) { + for (int col = 0; col < tableSchema.getColumnSchemas().size(); col++) { + IMeasurementSchema colSchema = tableSchema.getColumnSchemas().get(col); + String colName = colSchema.getMeasurementName(); + + if (tagDefaults.containsKey(colName)) { + tablet.addValue(colName, outputRow, tagDefaults.get(colName)); + continue; + } + + Integer srcIdx = sourceColumnIndex.get(colName); + if (srcIdx == null) { + continue; + } + + Object rawValue = batch.getValue(outputRow, srcIdx); + if (isNull(rawValue)) { + continue; + } + + boolean isMeasurement = tableSchema.getColumnTypes().get(col) == ColumnCategory.FIELD; + Object converted = ValueConverter.convert(rawValue, colSchema.getType(), isMeasurement); + tablet.addValue(colName, outputRow, converted); + } + } + + private void validateSingleDevice(Tablet tablet) { + IDeviceID expected = tablet.getDeviceID(0); + for (int i = 1; i < tablet.getRowSize(); i++) { + if (!tablet.getDeviceID(i).equals(expected)) { + throw new IllegalArgumentException( + Messages.format( + "error.tools.hybrid_uniform_tags_violation", + 0, + i, + expected, + tablet.getDeviceID(i))); + } + } + } + + private TableSchema buildTableSchema() { + List schemas = new ArrayList<>(); + List categories = new ArrayList<>(); + + for (ImportSchema.TagColumn tag : importSchema.getTagColumns()) { + if (tag.hasDefault()) { + tagDefaults.put(tag.getName(), tag.getDefaultValue()); + } + schemas.add( + new MeasurementSchema( + tag.getName(), + TSDataType.TEXT, + org.apache.tsfile.file.metadata.enums.TSEncoding.PLAIN, + org.apache.tsfile.file.metadata.enums.CompressionType.UNCOMPRESSED)); + categories.add(ColumnCategory.TAG); + } + + for (ImportSchema.SourceColumn field : importSchema.fieldColumns()) { + schemas.add( + new MeasurementSchema( + field.getName(), + field.getDataType(), + org.apache.tsfile.file.metadata.enums.TSEncoding.PLAIN, + org.apache.tsfile.file.metadata.enums.CompressionType.UNCOMPRESSED)); + categories.add(ColumnCategory.FIELD); + } + + return new TableSchema(importSchema.getTableName(), schemas, categories); + } + + private void buildSourceColumnIndex() { + List cols = ImportSchemaUtils.supplementSourceColumns(importSchema); + for (int i = 0; i < cols.size(); i++) { + sourceColumnIndex.put(cols.get(i).getName(), i); + } + } + + private boolean isNull(Object value) { + if (value == null) { + return true; + } + if (value instanceof String) { + String s = (String) value; + String nullFormat = importSchema.getNullFormat(); + if (nullFormat != null && nullFormat.equals(s)) { + return true; + } + return s.isEmpty(); + } + return false; + } +} diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java b/java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java index 98a28f293..2048d8554 100644 --- a/java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java +++ b/java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java @@ -58,11 +58,17 @@ public class TsFileTool { private static ImportSchema importSchema = null; + private static String hybridConfigPathStr = null; + public static void main(String[] args) { if (System.getenv("TSFILE_HOME") != null) { System.setProperty("TSFILE_HOME", System.getenv("TSFILE_HOME")); } parseCommandLineParams(args); + if (hybridConfigPathStr != null && !hybridConfigPathStr.isEmpty()) { + runHybridImport(hybridConfigPathStr); + return; + } if (!validateParams()) { return; } @@ -274,6 +280,7 @@ private static void parseCommandLineParams(String[] args) { separatorStr = null; formatStr = null; importSchema = null; + hybridConfigPathStr = null; Options options = new Options(); options.addOption("s", "source", true, "Input directory or file"); @@ -291,6 +298,7 @@ private static void parseCommandLineParams(String[] args) { true, "Source format: csv / parquet / arrow (default: auto-detect by extension)"); options.addOption("h", "help", false, "Show help"); + options.addOption(null, "hybrid_config", true, "Hybrid import config file path"); try { CommandLineParser parser = new DefaultParser(); @@ -335,6 +343,9 @@ private static void parseCommandLineParams(String[] args) { if (cmd.hasOption("format")) { formatStr = cmd.getOptionValue("format").toLowerCase(); } + if (cmd.hasOption("hybrid_config")) { + hybridConfigPathStr = cmd.getOptionValue("hybrid_config"); + } if (failedDirectoryStr == null || failedDirectoryStr.isEmpty()) { failedDirectoryStr = "failed"; } @@ -343,6 +354,18 @@ private static void parseCommandLineParams(String[] args) { } } + private static void runHybridImport(String configPath) { + try { + HybridImportConfig config = HybridImportConfigParser.parse(configPath); + HybridCsvTsFileAssembler.execute(config); + LOGGER.info( + Messages.format("log.tools.tool_execution_completed", config.getOutputTsfile())); + } catch (Exception e) { + LOGGER.error(Messages.format("log.tools.hybrid_import_failed", configPath), e); + System.exit(1); + } + } + static long parseBlockSize(String blockSizeValue) { blockSizeValue = blockSizeValue.toUpperCase(); if (blockSizeValue.endsWith("K")) { diff --git a/java/tools/src/test/java/org/apache/tsfile/tools/HybridCsvTsFileAssemblerTest.java b/java/tools/src/test/java/org/apache/tsfile/tools/HybridCsvTsFileAssemblerTest.java new file mode 100644 index 000000000..37db74044 --- /dev/null +++ b/java/tools/src/test/java/org/apache/tsfile/tools/HybridCsvTsFileAssemblerTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.tools; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.TsFileSequenceReader; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class HybridCsvTsFileAssemblerTest { + + private static final String TEST_DIR = "target" + File.separator + "hybridImportTest"; + + @Before + public void setUp() { + new File(TEST_DIR).mkdirs(); + } + + @After + public void tearDown() { + deleteRecursive(new File(TEST_DIR)); + } + + @Test + public void testHybridImport() throws Exception { + String schemaPath = TEST_DIR + File.separator + "shared.schema"; + writeSchema(schemaPath); + + String mainCsv = TEST_DIR + File.separator + "main.csv"; + writeFile( + mainCsv, + "Region,DeviceId,Time,Temperature,Pressure\n" + + "hebei,1,1000,80.0,1000.0\n" + + "hebei,1,2000,81.0,1001.0\n"); + + String sup1 = TEST_DIR + File.separator + "exp1.csv"; + writeFile( + sup1, + "Region,DeviceId,Temperature,Pressure\n" + "hebei,1,82.0,1002.0\n" + "hebei,1,83.0,1003.0\n"); + + String sup2 = TEST_DIR + File.separator + "exp2.csv"; + writeFile(sup2, "Region,DeviceId,Temperature,Pressure\n" + "hebei,2,90.0,1100.0\n"); + + String configPath = TEST_DIR + File.separator + "hybrid.conf"; + writeFile( + configPath, + "output_tsfile=" + TEST_DIR + File.separator + "combined.tsfile\n" + + "shared_schema=" + schemaPath + "\n" + + "main_csv=" + mainCsv + "\n" + + "main_batch_id=main\n" + + "supplement_csv=" + sup1 + "\n" + + "supplement_batch_id=exp1\n" + + "supplement_csv=" + sup2 + "\n" + + "supplement_batch_id=exp2\n"); + + HybridImportConfig config = HybridImportConfigParser.parse(configPath); + HybridCsvTsFileAssembler.execute(config); + + File tsfile = new File(TEST_DIR, "combined.tsfile"); + assertTrue(tsfile.exists()); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(tsfile.getAbsolutePath())) { + Map tableSchemas = reader.getTableSchemaMap(); + assertEquals(1, tableSchemas.size()); + assertTrue(tableSchemas.containsKey("lab")); + + List devices = reader.getAllDevices(); + // main: 1 device (hebei,1,main), exp1: 1 device, exp2: 1 device + assertEquals(3, devices.size()); + + int mainGroups = 0; + int exp1Groups = 0; + int exp2Groups = 0; + for (IDeviceID device : devices) { + String deviceStr = device.toString(); + if (deviceStr.contains("main")) { + mainGroups++; + } else if (deviceStr.contains("exp1")) { + exp1Groups++; + } else if (deviceStr.contains("exp2")) { + exp2Groups++; + } + } + assertEquals(1, mainGroups); + assertEquals(1, exp1Groups); + assertEquals(1, exp2Groups); + } + } + + @Test + public void testSyntheticTabletBuilderTimestamps() throws Exception { + ImportSchema schema = buildBaseSchema(); + schema = ImportSchemaUtils.withBatchIdTag(schema, "batch_id", "exp1"); + + SyntheticTabletBuilder builder = new SyntheticTabletBuilder(schema, true); + SourceBatch batch = + SourceBatch.fromRows( + java.util.Arrays.asList("Region", "DeviceId", "Temperature", "Pressure"), + java.util.Arrays.asList( + new Object[] {"hebei", "1", "80.0", "1000.0"}, + new Object[] {"hebei", "1", "81.0", "1001.0"})); + + org.apache.tsfile.write.record.Tablet tablet = builder.build(batch); + assertEquals(2, tablet.getRowSize()); + assertEquals(1L, tablet.getTimestamps()[0]); + assertEquals(2L, tablet.getTimestamps()[1]); + } + + private static ImportSchema buildBaseSchema() { + ImportSchema schema = new ImportSchema(); + schema.setTableName("lab"); + schema.setTimeColumnName("Time"); + schema.setTimePrecision("ms"); + schema.setHasHeader(true); + java.util.List tags = new java.util.ArrayList<>(); + tags.add(new ImportSchema.TagColumn("Region")); + tags.add(new ImportSchema.TagColumn("DeviceId")); + schema.setTagColumns(tags); + schema.setSourceColumns( + java.util.Arrays.asList( + new ImportSchema.SourceColumn("Region", TSDataType.TEXT), + new ImportSchema.SourceColumn("DeviceId", TSDataType.TEXT), + new ImportSchema.SourceColumn("Time", TSDataType.INT64), + new ImportSchema.SourceColumn("Temperature", TSDataType.FLOAT), + new ImportSchema.SourceColumn("Pressure", TSDataType.DOUBLE))); + return schema; + } + + private static void writeSchema(String path) throws IOException { + writeFile( + path, + "table_name=lab\n" + + "time_column=Time\n" + + "has_header=true\n" + + "separator=,\n" + + "tag_columns\n" + + "Region\n" + + "DeviceId\n" + + "source_columns\n" + + "Region TEXT,\n" + + "DeviceId TEXT,\n" + + "Time INT64,\n" + + "Temperature FLOAT,\n" + + "Pressure DOUBLE,\n"); + } + + private static void writeFile(String path, String content) throws IOException { + try (BufferedWriter w = new BufferedWriter(new FileWriter(path))) { + w.write(content); + } + } + + private static void deleteRecursive(File dir) { + File[] files = dir.listFiles(); + if (files != null) { + for (File f : files) { + if (f.isDirectory()) { + deleteRecursive(f); + } + f.delete(); + } + } + dir.delete(); + } +} diff --git a/java/tools/src/test/java/org/apache/tsfile/tools/SupplementSequentialIdTest.java b/java/tools/src/test/java/org/apache/tsfile/tools/SupplementSequentialIdTest.java new file mode 100644 index 000000000..8f3e4a933 --- /dev/null +++ b/java/tools/src/test/java/org/apache/tsfile/tools/SupplementSequentialIdTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.tools; + +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class SupplementSequentialIdTest { + + @Test + public void testConsecutiveTimestamps() { + assertArrayEquals(new long[] {1, 2, 3}, HybridCsvTsFileAssembler.buildConsecutiveTimestamps(1, 3)); + assertArrayEquals( + new long[] {4, 5}, HybridCsvTsFileAssembler.buildConsecutiveTimestamps(4, 2)); + } + + @Test + public void testChainedIdsAcrossTwoFiles() { + long nextId = 1; + int file1Rows = 2; + long[] t1 = HybridCsvTsFileAssembler.buildConsecutiveTimestamps(nextId, file1Rows); + nextId += file1Rows; + assertEquals(3, nextId); + + int file2Rows = 1; + long[] t2 = HybridCsvTsFileAssembler.buildConsecutiveTimestamps(nextId, file2Rows); + assertArrayEquals(new long[] {1, 2}, t1); + assertArrayEquals(new long[] {3}, t2); + } +} diff --git a/java/tools/src/test/java/org/apache/tsfile/tools/SupplementVarianceSorterTest.java b/java/tools/src/test/java/org/apache/tsfile/tools/SupplementVarianceSorterTest.java new file mode 100644 index 000000000..155a6f963 --- /dev/null +++ b/java/tools/src/test/java/org/apache/tsfile/tools/SupplementVarianceSorterTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.tools; + +import org.apache.tsfile.enums.TSDataType; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class SupplementVarianceSorterTest { + + @Test + public void testSortByPressureThenTemperature() { + ImportSchema schema = buildSchema(); + SourceBatch batch = + SourceBatch.fromRows( + Arrays.asList("Region", "DeviceId", "Temperature", "Pressure"), + Arrays.asList( + new Object[] {"hebei", "1", "1", "100"}, + new Object[] {"hebei", "1", "3", "50"}, + new Object[] {"hebei", "1", "2", "200"})); + + SourceBatch sorted = SupplementVarianceSorter.sortByVariancePriority(batch, schema); + + assertEquals("50", sorted.getValue(0, 3).toString()); + assertEquals("3", sorted.getValue(0, 2).toString()); + assertEquals("100", sorted.getValue(1, 3).toString()); + assertEquals("1", sorted.getValue(1, 2).toString()); + assertEquals("200", sorted.getValue(2, 3).toString()); + assertEquals("2", sorted.getValue(2, 2).toString()); + } + + private static ImportSchema buildSchema() { + ImportSchema schema = new ImportSchema(); + schema.setTableName("lab"); + schema.setTimeColumnName("Time"); + List tags = new ArrayList<>(); + tags.add(new ImportSchema.TagColumn("Region")); + tags.add(new ImportSchema.TagColumn("DeviceId")); + schema.setTagColumns(tags); + schema.setSourceColumns( + Arrays.asList( + new ImportSchema.SourceColumn("Region", TSDataType.TEXT), + new ImportSchema.SourceColumn("DeviceId", TSDataType.TEXT), + new ImportSchema.SourceColumn("Time", TSDataType.INT64), + new ImportSchema.SourceColumn("Temperature", TSDataType.FLOAT), + new ImportSchema.SourceColumn("Pressure", TSDataType.DOUBLE))); + return schema; + } +}