diff --git a/docs/en/connector-v2/source/Iceberg.md b/docs/en/connector-v2/source/Iceberg.md index 8bb21eb7b63..877be6f4d48 100644 --- a/docs/en/connector-v2/source/Iceberg.md +++ b/docs/en/connector-v2/source/Iceberg.md @@ -71,11 +71,12 @@ libfb303-xxx.jar ## Source Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |--------------------------|---------|----------|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | catalog_name | string | yes | - | User-specified catalog name. | | namespace | string | yes | - | The iceberg database name in the backend catalog. | -| table | string | yes | - | The iceberg table name in the backend catalog. | +| table | string | no | - | The iceberg table name in the backend catalog. | +| table_list | string | no | - | The iceberg table list in the backend catalog. | | iceberg.catalog.config | map | yes | - | Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java" | | hadoop.config | map | no | - | Properties passed through to the Hadoop configuration | | iceberg.hadoop-conf-path | string | no | - | The specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files. | @@ -87,6 +88,7 @@ libfb303-xxx.jar | use_snapshot_id | long | no | - | Instructs this scan to look for use the given snapshot ID. | | use_snapshot_timestamp | long | no | - | Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch | | stream_scan_strategy | enum | no | FROM_LATEST_SNAPSHOT | Starting strategy for stream mode execution, Default to use `FROM_LATEST_SNAPSHOT` if don’t specify any value,The optional values are:
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.
FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.
FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.
FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.
FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. | +| increment.scan-interval | long | no | 2000 | The interval of increment scan(mills) | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. | ## Task Example @@ -101,25 +103,6 @@ env { source { Iceberg { - schema { - fields { - f2 = "boolean" - f1 = "bigint" - f3 = "int" - f4 = "bigint" - f5 = "float" - f6 = "double" - f7 = "date" - f9 = "timestamp" - f10 = "timestamp" - f11 = "string" - f12 = "bytes" - f13 = "bytes" - f14 = "decimal(19,9)" - f15 = "array" - f16 = "map" - } - } catalog_name = "seatunnel" iceberg.catalog.config={ type = "hadoop" @@ -141,6 +124,31 @@ sink { } ``` +### Multi-Table Read: + +```hocon +source { + Iceberg { + catalog_name = "seatunnel" + iceberg.catalog.config = { + type = "hadoop" + warehouse = "file:///tmp/seatunnel/iceberg/hadoop/" + } + namespace = "database1" + table_list = [ + { + table = "table_1 + }, + { + table = "table_2 + } + ] + + plugin_output = "iceberg" + } +} +``` + ### Hadoop S3 Catalog: ```hocon diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceConfig.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceConfig.java index 56d0a4509df..7e906b3e4d6 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceConfig.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceConfig.java @@ -19,18 +19,19 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergStreamScanStrategy; -import org.apache.iceberg.expressions.Expression; - import lombok.Getter; import lombok.ToString; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + import static org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergStreamScanStrategy.FROM_LATEST_SNAPSHOT; @Getter @@ -74,45 +75,53 @@ public class SourceConfig extends CommonConfig { .defaultValue(FROM_LATEST_SNAPSHOT) .withDescription(" the iceberg strategy of stream scanning"); - private Long startSnapshotTimestamp; - private Long startSnapshotId; - private Long endSnapshotId; + public static final Option> KEY_TABLE_LIST = + Options.key("table_list") + .listType(SourceTableConfig.class) + .noDefaultValue() + .withDescription(" the iceberg tables"); - private Long useSnapshotId; - private Long useSnapshotTimestamp; + public static final Option KEY_INCREMENT_SCAN_INTERVAL = + Options.key("increment.scan-interval") + .longType() + .defaultValue(2000L) + .withDescription(" the interval of increment scan(mills)"); - private IcebergStreamScanStrategy streamScanStrategy = KEY_STREAM_SCAN_STRATEGY.defaultValue(); - private Expression filter; - private Long splitSize; - private Integer splitLookback; - private Long splitOpenFileCost; + private long incrementScanInterval; + private List tableList; public SourceConfig(ReadonlyConfig readonlyConfig) { super(readonlyConfig); - Config pluginConfig = readonlyConfig.toConfig(); - if (pluginConfig.hasPath(KEY_START_SNAPSHOT_TIMESTAMP.key())) { - this.startSnapshotTimestamp = pluginConfig.getLong(KEY_START_SNAPSHOT_TIMESTAMP.key()); - } - if (pluginConfig.hasPath(KEY_START_SNAPSHOT_ID.key())) { - this.startSnapshotId = pluginConfig.getLong(KEY_START_SNAPSHOT_ID.key()); - } - if (pluginConfig.hasPath(KEY_END_SNAPSHOT_ID.key())) { - this.endSnapshotId = pluginConfig.getLong(KEY_END_SNAPSHOT_ID.key()); - } - if (pluginConfig.hasPath(KEY_USE_SNAPSHOT_ID.key())) { - this.useSnapshotId = pluginConfig.getLong(KEY_USE_SNAPSHOT_ID.key()); - } - if (pluginConfig.hasPath(KEY_USE_SNAPSHOT_TIMESTAMP.key())) { - this.useSnapshotTimestamp = pluginConfig.getLong(KEY_USE_SNAPSHOT_TIMESTAMP.key()); - } - if (pluginConfig.hasPath(KEY_STREAM_SCAN_STRATEGY.key())) { - this.streamScanStrategy = - pluginConfig.getEnum( - IcebergStreamScanStrategy.class, KEY_STREAM_SCAN_STRATEGY.key()); + this.incrementScanInterval = readonlyConfig.get(KEY_INCREMENT_SCAN_INTERVAL); + if (this.getTable() != null) { + SourceTableConfig tableConfig = + SourceTableConfig.builder() + .namespace(this.getNamespace()) + .table(this.getTable()) + .startSnapshotTimestamp( + readonlyConfig.get(KEY_START_SNAPSHOT_TIMESTAMP)) + .startSnapshotId(readonlyConfig.get(KEY_START_SNAPSHOT_ID)) + .endSnapshotId(readonlyConfig.get(KEY_END_SNAPSHOT_ID)) + .useSnapshotId(readonlyConfig.get(KEY_USE_SNAPSHOT_ID)) + .useSnapshotTimestamp(readonlyConfig.get(KEY_USE_SNAPSHOT_TIMESTAMP)) + .streamScanStrategy(readonlyConfig.get(KEY_STREAM_SCAN_STRATEGY)) + .build(); + this.tableList = Collections.singletonList(tableConfig); + } else { + this.tableList = + readonlyConfig.get(KEY_TABLE_LIST).stream() + .map( + tableConfig -> + tableConfig.setNamespace( + SourceConfig.this.getNamespace())) + .collect(Collectors.toList()); } } - public static SourceConfig loadConfig(ReadonlyConfig pluginConfig) { - return new SourceConfig(pluginConfig); + public SourceTableConfig getTableConfig(TablePath tablePath) { + return tableList.stream() + .filter(tableConfig -> tableConfig.getTablePath().equals(tablePath)) + .findFirst() + .get(); } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java new file mode 100644 index 00000000000..99524f8373a --- /dev/null +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.iceberg.config; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergStreamScanStrategy; +import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.experimental.Tolerate; + +import java.io.Serializable; + +import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_STREAM_SCAN_STRATEGY; + +@AllArgsConstructor +@Data +@Builder +public class SourceTableConfig implements Serializable { + private String namespace; + private String table; + + private Long startSnapshotTimestamp; + private Long startSnapshotId; + private Long endSnapshotId; + + private Long useSnapshotId; + private Long useSnapshotTimestamp; + + private IcebergStreamScanStrategy streamScanStrategy = KEY_STREAM_SCAN_STRATEGY.defaultValue(); + private Expression filter; + private Long splitSize; + private Integer splitLookback; + private Long splitOpenFileCost; + + @Tolerate + public SourceTableConfig() {} + + public TablePath getTablePath() { + String[] paths = table.split("\\."); + if (paths.length == 1) { + return TablePath.of(namespace, table); + } + if (paths.length == 2) { + return TablePath.of(paths[0], paths[1]); + } + String namespace = table.substring(0, table.lastIndexOf("\\.")); + return TablePath.of(namespace, table); + } + + public TableIdentifier getTableIdentifier() { + return SchemaUtils.toIcebergTableIdentifier(getTablePath()); + } + + public SourceTableConfig setNamespace(String namespace) { + this.namespace = namespace; + return this; + } +} diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java index 1028ae21b4a..22f42a53bff 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java @@ -158,6 +158,7 @@ public void close() throws IOException { if (writer != null) { writer.close(); } + icebergTableLoader.close(); } finally { results.clear(); } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java index 008289690a9..7205e1108b3 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java @@ -33,10 +33,11 @@ public class IcebergAggregatedCommitter implements SinkAggregatedCommitter { + private final IcebergTableLoader tableLoader; private final IcebergFilesCommitter filesCommitter; public IcebergAggregatedCommitter(SinkConfig config, CatalogTable catalogTable) { - IcebergTableLoader tableLoader = IcebergTableLoader.create(config, catalogTable); + this.tableLoader = IcebergTableLoader.create(config, catalogTable); this.filesCommitter = IcebergFilesCommitter.of(config, tableLoader); } @@ -68,5 +69,7 @@ public IcebergAggregatedCommitInfo combine(List commitInfos) public void abort(List aggregatedCommitInfo) throws Exception {} @Override - public void close() throws IOException {} + public void close() throws IOException { + this.tableLoader.close(); + } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java index c56f3f2f00e..4ed750e0c83 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java @@ -17,10 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.source; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; @@ -28,34 +25,30 @@ import org.apache.seatunnel.api.source.SupportColumnProjection; import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; -import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader; +import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergBatchSplitEnumerator; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergSplitEnumeratorState; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergStreamSplitEnumerator; -import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergScanContext; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader.IcebergSourceReader; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit; -import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.iceberg.Schema; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import lombok.SneakyThrows; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; - -import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; +import java.util.Map; +import java.util.stream.Collectors; public class IcebergSource implements SeaTunnelSource< @@ -66,23 +59,21 @@ public class IcebergSource private static final long serialVersionUID = 4343414808223919870L; private final SourceConfig sourceConfig; - private final Schema tableSchema; - private final Schema projectedSchema; - private final SeaTunnelRowType seaTunnelRowType; + private final Map catalogTables; + private final Map> tableSchemaProjections; private JobContext jobContext; - private final CatalogTable catalogTable; - - public IcebergSource(ReadonlyConfig config, CatalogTable catalogTable) { - this.sourceConfig = SourceConfig.loadConfig(config); - this.tableSchema = loadIcebergSchema(sourceConfig); - this.seaTunnelRowType = loadSeaTunnelRowType(tableSchema, config.toConfig()); - this.projectedSchema = tableSchema.select(seaTunnelRowType.getFieldNames()); - this.catalogTable = catalogTable; + + public IcebergSource(SourceConfig config, List catalogTables) { + this.sourceConfig = config; + this.catalogTables = + catalogTables.stream() + .collect(Collectors.toMap(CatalogTable::getTablePath, table -> table)); + this.tableSchemaProjections = loadIcebergSchemaProjections(config, this.catalogTables); } @Override public List getProducedCatalogTables() { - return Collections.singletonList(catalogTable); + return new ArrayList<>(catalogTables.values()); } @Override @@ -91,46 +82,30 @@ public String getPluginName() { } @SneakyThrows - private Schema loadIcebergSchema(SourceConfig sourceConfig) { - try (IcebergTableLoader icebergTableLoader = - IcebergTableLoader.create(sourceConfig, catalogTable)) { - icebergTableLoader.open(); - return icebergTableLoader.loadTable().schema(); - } - } - - private SeaTunnelRowType loadSeaTunnelRowType(Schema tableSchema, Config pluginConfig) { - List columnNames = new ArrayList<>(tableSchema.columns().size()); - List> columnDataTypes = new ArrayList<>(tableSchema.columns().size()); - for (Types.NestedField column : tableSchema.columns()) { - columnNames.add(column.name()); - columnDataTypes.add(SchemaUtils.toSeaTunnelType(column.name(), column.type())); - } - SeaTunnelRowType originalRowType = - new SeaTunnelRowType( - columnNames.toArray(new String[0]), - columnDataTypes.toArray(new SeaTunnelDataType[0])); - - CheckResult checkResult = - CheckConfigUtil.checkAllExists(pluginConfig, TableSchemaOptions.SCHEMA.key()); - if (checkResult.isSuccess()) { - SeaTunnelRowType projectedRowType = - CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); - for (int i = 0; i < projectedRowType.getFieldNames().length; i++) { - String fieldName = projectedRowType.getFieldName(i); - SeaTunnelDataType projectedFieldType = projectedRowType.getFieldType(i); - int originalFieldIndex = originalRowType.indexOf(fieldName); - SeaTunnelDataType originalFieldType = - originalRowType.getFieldType(originalFieldIndex); - checkArgument( - projectedFieldType.equals(originalFieldType), - String.format( - "Illegal field: %s, original: %s <-> projected: %s", - fieldName, originalFieldType, projectedFieldType)); + private Map> loadIcebergSchemaProjections( + SourceConfig config, Map tables) { + IcebergCatalogLoader catalogFactory = new IcebergCatalogLoader(config); + Catalog catalog = catalogFactory.loadCatalog(); + + Map> icebergTables = new HashMap<>(); + try { + for (TablePath tablePath : tables.keySet()) { + CatalogTable catalogTable = tables.get(tablePath); + Table icebergTable = + catalog.loadTable( + TableIdentifier.of( + tablePath.getDatabaseName(), tablePath.getTableName())); + Schema icebergSchema = icebergTable.schema(); + Schema projectedSchema = + icebergSchema.select(catalogTable.getTableSchema().getFieldNames()); + icebergTables.put(tablePath, Pair.of(icebergSchema, projectedSchema)); + } + } finally { + if (catalog instanceof AutoCloseable) { + ((AutoCloseable) catalog).close(); } - return projectedRowType; } - return originalRowType; + return icebergTables; } @Override @@ -149,12 +124,7 @@ public void setJobContext(JobContext jobContext) { public SourceReader createReader( SourceReader.Context readerContext) { return new IcebergSourceReader( - readerContext, - seaTunnelRowType, - tableSchema, - projectedSchema, - sourceConfig, - catalogTable); + readerContext, sourceConfig, catalogTables, tableSchemaProjections); } @Override @@ -163,18 +133,10 @@ public SourceReader createReader( SourceSplitEnumerator.Context enumeratorContext) { if (Boundedness.BOUNDED.equals(getBoundedness())) { return new IcebergBatchSplitEnumerator( - enumeratorContext, - IcebergScanContext.scanContext(sourceConfig, projectedSchema), - sourceConfig, - null, - catalogTable); + enumeratorContext, sourceConfig, catalogTables, tableSchemaProjections); } return new IcebergStreamSplitEnumerator( - enumeratorContext, - IcebergScanContext.streamScanContext(sourceConfig, projectedSchema), - sourceConfig, - null, - catalogTable); + enumeratorContext, sourceConfig, catalogTables, tableSchemaProjections); } @Override @@ -185,16 +147,16 @@ public SourceReader createReader( if (Boundedness.BOUNDED.equals(getBoundedness())) { return new IcebergBatchSplitEnumerator( enumeratorContext, - IcebergScanContext.scanContext(sourceConfig, projectedSchema), sourceConfig, - checkpointState, - catalogTable); + catalogTables, + tableSchemaProjections, + checkpointState); } return new IcebergStreamSplitEnumerator( enumeratorContext, - IcebergScanContext.streamScanContext(sourceConfig, projectedSchema), sourceConfig, - checkpointState, - catalogTable); + catalogTables, + tableSchemaProjections, + checkpointState); } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java index 6e7c05c9ab1..e1c7424cc01 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java @@ -33,16 +33,19 @@ import org.apache.seatunnel.connectors.seatunnel.iceberg.catalog.IcebergCatalog; import org.apache.seatunnel.connectors.seatunnel.iceberg.catalog.IcebergCatalogFactory; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig; -import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig; import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig.KEY_CASE_SENSITIVE; import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_END_SNAPSHOT_ID; +import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_INCREMENT_SCAN_INTERVAL; import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_START_SNAPSHOT_ID; import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_START_SNAPSHOT_TIMESTAMP; import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_STREAM_SCAN_STRATEGY; @@ -63,9 +66,9 @@ public OptionRule optionRule() { return OptionRule.builder() .required( CommonConfig.KEY_CATALOG_NAME, - SinkConfig.KEY_NAMESPACE, - SinkConfig.KEY_TABLE, - SinkConfig.CATALOG_PROPS) + CommonConfig.KEY_NAMESPACE, + CommonConfig.CATALOG_PROPS) + .exclusive(CommonConfig.KEY_TABLE, SourceConfig.KEY_TABLE_LIST) .optional( TableSchemaOptions.SCHEMA, KEY_CASE_SENSITIVE, @@ -74,7 +77,8 @@ public OptionRule optionRule() { KEY_END_SNAPSHOT_ID, KEY_USE_SNAPSHOT_ID, KEY_USE_SNAPSHOT_TIMESTAMP, - KEY_STREAM_SCAN_STRATEGY) + KEY_STREAM_SCAN_STRATEGY, + KEY_INCREMENT_SCAN_INTERVAL) .build(); } @@ -83,24 +87,37 @@ public OptionRule optionRule() { TableSource createSource(TableSourceFactoryContext context) { ReadonlyConfig options = context.getOptions(); SourceConfig config = new SourceConfig(options); - TablePath tablePath = TablePath.of(config.getNamespace(), config.getTable()); CatalogTable catalogTable; if (options.get(TableSchemaOptions.SCHEMA) != null) { + TablePath tablePath = config.getTableList().get(0).getTablePath(); catalogTable = CatalogTableUtil.buildWithConfig(factoryIdentifier(), options); TableIdentifier tableIdentifier = TableIdentifier.of(catalogTable.getCatalogName(), tablePath); CatalogTable table = CatalogTable.of(tableIdentifier, catalogTable); - return () -> (SeaTunnelSource) new IcebergSource(options, table); - } else { - // build iceberg catalog - IcebergCatalogFactory icebergCatalogFactory = new IcebergCatalogFactory(); - IcebergCatalog catalog = - (IcebergCatalog) - icebergCatalogFactory.createCatalog(factoryIdentifier(), options); + return () -> + (SeaTunnelSource) + new IcebergSource(config, Collections.singletonList(table)); + } + + try (IcebergCatalog catalog = + (IcebergCatalog) + new IcebergCatalogFactory().createCatalog(factoryIdentifier(), options)) { catalog.open(); - catalogTable = catalog.getTable(tablePath); + + if (config.getTable() != null) { + TablePath tablePath = config.getTableList().get(0).getTablePath(); + catalogTable = catalog.getTable(tablePath); + return () -> + (SeaTunnelSource) + new IcebergSource(config, Collections.singletonList(catalogTable)); + } + + List catalogTables = + config.getTableList().stream() + .map(tableConfig -> catalog.getTable(tableConfig.getTablePath())) + .collect(Collectors.toList()); return () -> - (SeaTunnelSource) new IcebergSource(options, catalogTable); + (SeaTunnelSource) new IcebergSource(config, catalogTables); } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java index 73cb71f45fe..8ccdfd6e0f2 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java @@ -19,14 +19,18 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; -import lombok.Getter; -import lombok.NonNull; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -37,6 +41,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; @Slf4j public abstract class AbstractSplitEnumerator @@ -44,75 +53,153 @@ public abstract class AbstractSplitEnumerator protected final Context context; protected final SourceConfig sourceConfig; + protected final Map tables; + protected final Map> tableSchemaProjections; + protected final Catalog icebergCatalog; + protected final Object stateLock = new Object(); + + protected final BlockingQueue pendingTables; protected final Map> pendingSplits; - protected IcebergTableLoader icebergTableLoader; - @Getter private volatile boolean isOpen = false; - private CatalogTable catalogTable; + public AbstractSplitEnumerator( + Context context, + SourceConfig sourceConfig, + Map catalogTables, + Map> tableSchemaProjections) { + this(context, sourceConfig, catalogTables, tableSchemaProjections, null); + } public AbstractSplitEnumerator( - @NonNull SourceSplitEnumerator.Context context, - @NonNull SourceConfig sourceConfig, - @NonNull Map> pendingSplits, - CatalogTable catalogTable) { + Context context, + SourceConfig sourceConfig, + Map catalogTables, + Map> tableSchemaProjections, + IcebergSplitEnumeratorState state) { this.context = context; this.sourceConfig = sourceConfig; - this.pendingSplits = new HashMap<>(pendingSplits); - this.catalogTable = catalogTable; + this.tables = catalogTables; + this.tableSchemaProjections = tableSchemaProjections; + this.icebergCatalog = new IcebergCatalogLoader(sourceConfig).loadCatalog(); + this.pendingTables = new ArrayBlockingQueue<>(catalogTables.size()); + this.pendingSplits = new HashMap<>(); + if (state == null) { + this.pendingTables.addAll( + catalogTables.values().stream() + .map(CatalogTable::getTablePath) + .collect(Collectors.toList())); + } else { + this.pendingTables.addAll(state.getPendingTables()); + state.getPendingSplits().values().stream() + .flatMap( + (Function< + List, + Stream>) + splits -> splits.stream()) + .map( + (Function) + split -> { + // TODO: Waiting for old version migration to complete + // before remove + if (split.getTablePath() == null) { + new IcebergFileScanTaskSplit( + catalogTables.values().stream() + .findFirst() + .get() + .getTablePath(), + split.getTask(), + split.getRecordOffset()); + } + return null; + }) + .forEach( + split -> + pendingSplits + .computeIfAbsent( + getSplitOwner( + split.splitId(), + context.currentParallelism()), + r -> new ArrayList<>()) + .add(split)); + } } @Override public void open() { - icebergTableLoader = IcebergTableLoader.create(sourceConfig, catalogTable); - icebergTableLoader.open(); - isOpen = true; - } - - @Override - public void run() { - refreshPendingSplits(); - assignPendingSplits(context.registeredReaders()); - } - - @Override - public void close() throws IOException { - icebergTableLoader.close(); - isOpen = false; + log.info("Open split enumerator."); } @Override public void addSplitsBack(List splits, int subtaskId) { - addPendingSplits(splits); - if (context.registeredReaders().contains(subtaskId)) { - assignPendingSplits(Collections.singleton(subtaskId)); + if (!splits.isEmpty()) { + synchronized (stateLock) { + addPendingSplits(splits); + if (context.registeredReaders().contains(subtaskId)) { + assignPendingSplits(Collections.singleton(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); + } + } } + log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); } @Override public int currentUnassignedSplitSize() { - return pendingSplits.size(); + if (!pendingTables.isEmpty()) { + return pendingTables.size(); + } + if (!pendingSplits.isEmpty()) { + return pendingSplits.values().stream().mapToInt(List::size).sum(); + } + return 0; } + @Override + public void handleSplitRequest(int subtaskId) {} + @Override public void registerReader(int subtaskId) { log.debug("Adding reader {} to IcebergSourceEnumerator.", subtaskId); - assignPendingSplits(Collections.singleton(subtaskId)); + synchronized (stateLock) { + assignPendingSplits(Collections.singleton(subtaskId)); + } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception {} - protected void refreshPendingSplits() { - List newSplits = loadNewSplits(icebergTableLoader.loadTable()); - addPendingSplits(newSplits); + @SneakyThrows + @Override + public void close() throws IOException { + log.info("Close split enumerator."); + if (icebergCatalog instanceof AutoCloseable) { + ((AutoCloseable) icebergCatalog).close(); + } + } + + protected Table loadTable(TablePath tablePath) { + return icebergCatalog.loadTable( + TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName())); } - protected abstract List loadNewSplits(Table table); + protected void checkThrowInterruptedException() throws InterruptedException { + if (Thread.currentThread().isInterrupted()) { + log.info("Enumerator thread is interrupted."); + throw new InterruptedException("Enumerator thread is interrupted."); + } + } + + private static int getSplitOwner(String splitId, int numReaders) { + return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders; + } - private void addPendingSplits(Collection newSplits) { + protected void addPendingSplits(Collection newSplits) { int numReaders = context.currentParallelism(); for (IcebergFileScanTaskSplit newSplit : newSplits) { - int ownerReader = (newSplit.splitId().hashCode() & Integer.MAX_VALUE) % numReaders; + int ownerReader = getSplitOwner(newSplit.splitId(), numReaders); pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(newSplit); log.info("Assigning {} to {} reader.", newSplit, ownerReader); } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergBatchSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergBatchSplitEnumerator.java index b0adfb011c1..72f3f56635d 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergBatchSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergBatchSplitEnumerator.java @@ -17,61 +17,88 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator; -import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergScanContext; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergScanSplitPlanner; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import lombok.NonNull; import lombok.extern.slf4j.Slf4j; -import java.util.Collections; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; @Slf4j public class IcebergBatchSplitEnumerator extends AbstractSplitEnumerator { - private final IcebergScanContext icebergScanContext; + public IcebergBatchSplitEnumerator( + Context context, + SourceConfig sourceConfig, + Map catalogTables, + Map> tableSchemaProjections) { + this(context, sourceConfig, catalogTables, tableSchemaProjections, null); + } public IcebergBatchSplitEnumerator( - @NonNull SourceSplitEnumerator.Context context, - @NonNull IcebergScanContext icebergScanContext, - @NonNull SourceConfig sourceConfig, - IcebergSplitEnumeratorState restoreState, - CatalogTable catalogTable) { - super( - context, - sourceConfig, - restoreState != null ? restoreState.getPendingSplits() : Collections.emptyMap(), - catalogTable); - this.icebergScanContext = icebergScanContext; + Context context, + SourceConfig sourceConfig, + Map catalogTables, + Map> tableSchemaProjections, + IcebergSplitEnumeratorState state) { + super(context, sourceConfig, catalogTables, tableSchemaProjections, state); } @Override - public void run() { - super.run(); - + public void run() throws Exception { Set readers = context.registeredReaders(); + while (!pendingTables.isEmpty()) { + synchronized (stateLock) { + checkThrowInterruptedException(); + + TablePath tablePath = pendingTables.poll(); + log.info("Splitting table {}.", tablePath); + + Collection splits = loadSplits(tablePath); + log.info("Split table {} into {} splits.", tablePath, splits.size()); + + addPendingSplits(splits); + } + + synchronized (stateLock) { + assignPendingSplits(readers); + } + } + log.debug( "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); readers.forEach(context::signalNoMoreSplits); } @Override - public IcebergSplitEnumeratorState snapshotState(long checkpointId) { - return new IcebergSplitEnumeratorState(null, pendingSplits); + public IcebergSplitEnumeratorState snapshotState(long checkpointId) throws Exception { + synchronized (stateLock) { + return new IcebergSplitEnumeratorState( + new ArrayList<>(pendingTables), new HashMap<>(pendingSplits)); + } } - @Override - public void handleSplitRequest(int subtaskId) {} - - @Override - protected List loadNewSplits(Table table) { - return IcebergScanSplitPlanner.planSplits(table, icebergScanContext); + private List loadSplits(TablePath tablePath) { + Table table = loadTable(tablePath); + Pair tableSchemaProjection = tableSchemaProjections.get(tablePath); + IcebergScanContext scanContext = + IcebergScanContext.scanContext( + sourceConfig, + sourceConfig.getTableConfig(tablePath), + tableSchemaProjection.getRight()); + return IcebergScanSplitPlanner.planSplits(table, scanContext); } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergSplitEnumeratorState.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergSplitEnumeratorState.java index 4b170989898..9637cd0ec12 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergSplitEnumeratorState.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergSplitEnumeratorState.java @@ -17,25 +17,66 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit; -import lombok.AllArgsConstructor; import lombok.Getter; -import lombok.Setter; import lombok.ToString; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @Getter -@Setter -@AllArgsConstructor @ToString public class IcebergSplitEnumeratorState implements Serializable { private static final long serialVersionUID = -529307606400995298L; - private final IcebergEnumeratorPosition lastEnumeratedPosition; - private final Map> pendingSplits; + // TODO: Waiting for migration to complete before remove + @Deprecated private IcebergEnumeratorPosition lastEnumeratedPosition; + + private Collection pendingTables; + private Map> pendingSplits; + private Map tableOffsets; + + public IcebergSplitEnumeratorState( + Collection pendingTables, + Map> pendingSplits) { + this(pendingTables, pendingSplits, Collections.emptyMap()); + } + + public IcebergSplitEnumeratorState( + Collection pendingTables, + Map> pendingSplits, + Map tableOffsets) { + this.pendingTables = pendingTables; + this.pendingSplits = pendingSplits; + this.tableOffsets = tableOffsets; + } + + // TODO: Waiting for migration to complete before remove + @Deprecated + public IcebergSplitEnumeratorState( + IcebergEnumeratorPosition lastEnumeratedPosition, + Map> pendingSplits) { + this.lastEnumeratedPosition = lastEnumeratedPosition; + this.pendingSplits = pendingSplits; + this.pendingTables = new ArrayList<>(); + this.tableOffsets = new HashMap<>(); + } + + // TODO: Waiting for migration to complete before remove + @Deprecated + public IcebergSplitEnumeratorState setPendingTable(TablePath table) { + if (lastEnumeratedPosition != null) { + this.pendingTables.add(table); + this.tableOffsets.put(table, lastEnumeratedPosition); + } + return this; + } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java index 266985a0775..14f02dd58d7 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java @@ -17,79 +17,125 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator; -import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergScanContext; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergScanSplitPlanner; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; @Slf4j public class IcebergStreamSplitEnumerator extends AbstractSplitEnumerator { - private final IcebergScanContext icebergScanContext; - private final AtomicReference enumeratorPosition; + private final ConcurrentMap tableOffsets; + private volatile boolean initialized = false; public IcebergStreamSplitEnumerator( - @NonNull SourceSplitEnumerator.Context context, - @NonNull IcebergScanContext icebergScanContext, - @NonNull SourceConfig sourceConfig, - IcebergSplitEnumeratorState restoreState, - CatalogTable catalogTable) { - super( - context, - sourceConfig, - restoreState != null ? restoreState.getPendingSplits() : Collections.emptyMap(), - catalogTable); - this.icebergScanContext = icebergScanContext; - this.enumeratorPosition = new AtomicReference<>(); - if (restoreState != null) { - enumeratorPosition.set(restoreState.getLastEnumeratedPosition()); + Context context, + SourceConfig sourceConfig, + Map catalogTables, + Map> tableSchemaProjections) { + this(context, sourceConfig, catalogTables, tableSchemaProjections, null); + } + + public IcebergStreamSplitEnumerator( + Context context, + SourceConfig sourceConfig, + Map catalogTables, + Map> tableSchemaProjections, + IcebergSplitEnumeratorState state) { + super(context, sourceConfig, catalogTables, tableSchemaProjections, state); + this.tableOffsets = new ConcurrentHashMap<>(); + if (state != null) { + if (state.getLastEnumeratedPosition() != null) { + // TODO: Waiting for migration to complete before remove + state.setPendingTable( + catalogTables.values().stream().findFirst().get().getTablePath()); + } + this.tableOffsets.putAll(state.getTableOffsets()); + } + } + + @Override + public void run() throws Exception { + Set readers = context.registeredReaders(); + while (true) { + for (TablePath tablePath : pendingTables) { + checkThrowInterruptedException(); + + synchronized (stateLock) { + log.info("Scan table {}.", tablePath); + + Collection splits = loadSplits(tablePath); + log.info("Scan table {} into {} splits.", tablePath, splits.size()); + addPendingSplits(splits); + assignPendingSplits(readers); + } + } + + if (Boolean.FALSE.equals(initialized)) { + initialized = true; + } + + stateLock.wait(sourceConfig.getIncrementScanInterval()); } } @Override public IcebergSplitEnumeratorState snapshotState(long checkpointId) throws Exception { - return new IcebergSplitEnumeratorState(enumeratorPosition.get(), pendingSplits); + synchronized (stateLock) { + return new IcebergSplitEnumeratorState( + new ArrayList<>(pendingTables), + new HashMap<>(pendingSplits), + new HashMap<>(tableOffsets)); + } } @Override public void handleSplitRequest(int subtaskId) { - if (isOpen()) { - synchronized (this) { - if (pendingSplits.isEmpty() || pendingSplits.get(subtaskId) == null) { - refreshPendingSplits(); - } - assignPendingSplits(Collections.singleton(subtaskId)); - } + if (initialized) { + stateLock.notifyAll(); } } - @Override - protected List loadNewSplits(Table table) { + private List loadSplits(TablePath tablePath) { + Table table = loadTable(tablePath); + IcebergEnumeratorPosition offset = tableOffsets.get(tablePath); + Pair tableSchemaProjection = tableSchemaProjections.get(tablePath); + IcebergScanContext scanContext = + IcebergScanContext.streamScanContext( + sourceConfig, + sourceConfig.getTableConfig(tablePath), + tableSchemaProjection.getRight()); IcebergEnumerationResult result = - IcebergScanSplitPlanner.planStreamSplits( - table, icebergScanContext, enumeratorPosition.get()); - if (!Objects.equals(result.getFromPosition(), enumeratorPosition.get())) { + IcebergScanSplitPlanner.planStreamSplits(table, scanContext, offset); + if (!Objects.equals(result.getFromPosition(), offset)) { log.info( "Skip {} loaded splits because the scan starting position doesn't match " + "the current enumerator position: enumerator position = {}, scan starting position = {}", result.getSplits().size(), - enumeratorPosition.get(), + tableOffsets.get(tablePath), result.getFromPosition()); return Collections.emptyList(); } else { - enumeratorPosition.set(result.getToPosition()); + tableOffsets.put(tablePath, result.getToPosition()); log.debug("Update enumerator position to {}", result.getToPosition()); return result.getSplits(); } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java index 7b29c80f678..09b2145c9fb 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java @@ -17,7 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig; +import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceTableConfig; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expression; @@ -31,6 +33,7 @@ @ToString public class IcebergScanContext { + private final TablePath tablePath; private final boolean streaming; private final IcebergStreamScanStrategy streamScanStrategy; @@ -59,27 +62,30 @@ public IcebergScanContext copyWithAppendsBetween( .build(); } - public static IcebergScanContext scanContext(SourceConfig sourceConfig, Schema schema) { + public static IcebergScanContext scanContext( + SourceConfig sourceConfig, SourceTableConfig tableConfig, Schema schema) { return IcebergScanContext.builder() - .startSnapshotTimestamp(sourceConfig.getStartSnapshotTimestamp()) - .startSnapshotId(sourceConfig.getStartSnapshotId()) - .endSnapshotId(sourceConfig.getEndSnapshotId()) - .useSnapshotId(sourceConfig.getUseSnapshotId()) - .useSnapshotTimestamp(sourceConfig.getUseSnapshotTimestamp()) + .tablePath(tableConfig.getTablePath()) + .startSnapshotTimestamp(tableConfig.getStartSnapshotTimestamp()) + .startSnapshotId(tableConfig.getStartSnapshotId()) + .endSnapshotId(tableConfig.getEndSnapshotId()) + .useSnapshotId(tableConfig.getUseSnapshotId()) + .useSnapshotTimestamp(tableConfig.getUseSnapshotTimestamp()) .caseSensitive(sourceConfig.isCaseSensitive()) .schema(schema) - .filter(sourceConfig.getFilter()) - .splitSize(sourceConfig.getSplitSize()) - .splitLookback(sourceConfig.getSplitLookback()) - .splitOpenFileCost(sourceConfig.getSplitOpenFileCost()) + .filter(tableConfig.getFilter()) + .splitSize(tableConfig.getSplitSize()) + .splitLookback(tableConfig.getSplitLookback()) + .splitOpenFileCost(tableConfig.getSplitOpenFileCost()) .build(); } - public static IcebergScanContext streamScanContext(SourceConfig sourceConfig, Schema schema) { - return scanContext(sourceConfig, schema) + public static IcebergScanContext streamScanContext( + SourceConfig sourceConfig, SourceTableConfig tableConfig, Schema schema) { + return scanContext(sourceConfig, tableConfig, schema) .toBuilder() .streaming(true) - .streamScanStrategy(sourceConfig.getStreamScanStrategy()) + .streamScanStrategy(tableConfig.getStreamScanStrategy()) .build(); } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java index d006241da4c..404b4ab5ebe 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java @@ -174,7 +174,7 @@ public static List planSplits( List splits = new ArrayList<>(); for (CombinedScanTask combinedScanTask : tasksIterable) { for (FileScanTask fileScanTask : combinedScanTask.files()) { - splits.add(new IcebergFileScanTaskSplit(fileScanTask)); + splits.add(new IcebergFileScanTaskSplit(context.getTablePath(), fileScanTask)); } } return splits; diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java index 7c472f9af7f..7b2236c5399 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java @@ -44,10 +44,12 @@ public CloseableIterator open(@NonNull IcebergFileScanTaskSplit sp OffsetSeekIterator seekIterator = new OffsetSeekIterator<>(iterator); seekIterator.seek(split.getRecordOffset()); + String tableId = split.getTablePath().getFullName(); return CloseableIterator.transform( seekIterator, record -> { SeaTunnelRow seaTunnelRow = deserializer.deserialize(record); + seaTunnelRow.setTableId(tableId); split.setRecordOffset(split.getRecordOffset() + 1); return seaTunnelRow; }); diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java index 83f42879d0b..b71b73f0898 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java @@ -21,25 +21,33 @@ import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader; +import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig; +import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceTableConfig; import org.apache.seatunnel.connectors.seatunnel.iceberg.data.DefaultDeserializer; import org.apache.seatunnel.connectors.seatunnel.iceberg.data.Deserializer; import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit; +import org.apache.commons.lang3.tuple.Pair; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.io.CloseableIterator; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; @Slf4j public class IcebergSourceReader implements SourceReader { @@ -47,72 +55,95 @@ public class IcebergSourceReader implements SourceReader pendingSplits; - private final Deserializer deserializer; - private final Schema tableSchema; - private final Schema projectedSchema; private final SourceConfig sourceConfig; + private final Map tables; + private final Map> tableSchemaProjections; + private final BlockingQueue pendingSplits; - private IcebergTableLoader icebergTableLoader; - private IcebergFileScanTaskSplitReader icebergFileScanTaskSplitReader; + private volatile IcebergFileScanTaskSplit currentReadSplit; + private volatile boolean noMoreSplitsAssignment; - private IcebergFileScanTaskSplit currentReadSplit; - private boolean noMoreSplitsAssignment; - - private CatalogTable catalogTable; + private Catalog catalog; + private ConcurrentMap tableReaders; public IcebergSourceReader( @NonNull SourceReader.Context context, - @NonNull SeaTunnelRowType seaTunnelRowType, - @NonNull Schema tableSchema, - @NonNull Schema projectedSchema, @NonNull SourceConfig sourceConfig, - CatalogTable catalogTable) { + @NonNull Map tables, + @NonNull Map> tableSchemaProjections) { this.context = context; - this.pendingSplits = new LinkedList<>(); - this.catalogTable = catalogTable; - this.deserializer = new DefaultDeserializer(seaTunnelRowType, projectedSchema); - this.tableSchema = tableSchema; - this.projectedSchema = projectedSchema; this.sourceConfig = sourceConfig; + this.tables = tables; + this.tableSchemaProjections = tableSchemaProjections; + this.pendingSplits = new LinkedBlockingQueue<>(); + this.tableReaders = new ConcurrentHashMap<>(); } @Override public void open() { - icebergTableLoader = IcebergTableLoader.create(sourceConfig, catalogTable); - icebergTableLoader.open(); - - icebergFileScanTaskSplitReader = - new IcebergFileScanTaskSplitReader( - deserializer, - IcebergFileScanTaskReader.builder() - .fileIO(icebergTableLoader.loadTable().io()) - .tableSchema(tableSchema) - .projectedSchema(projectedSchema) - .caseSensitive(sourceConfig.isCaseSensitive()) - .reuseContainers(true) - .build()); + IcebergCatalogLoader catalogFactory = new IcebergCatalogLoader(sourceConfig); + catalog = catalogFactory.loadCatalog(); } @Override public void close() throws IOException { - if (icebergFileScanTaskSplitReader != null) { - icebergFileScanTaskSplitReader.close(); + if (catalog != null && catalog instanceof Closeable) { + ((Closeable) catalog).close(); + } + tableReaders.forEach((tablePath, reader) -> reader.close()); + } + + private IcebergFileScanTaskSplitReader getOrCreateTableReader(TablePath tablePath) { + IcebergFileScanTaskSplitReader tableReader = tableReaders.get(tablePath); + if (tableReader != null) { + return tableReader; } - icebergTableLoader.close(); + + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + // clean up table readers if the source is bounded + tableReaders.forEach((key, value) -> value.close()); + tableReaders.clear(); + } + + return tableReaders.computeIfAbsent( + tablePath, + key -> { + SourceTableConfig tableConfig = sourceConfig.getTableConfig(key); + CatalogTable catalogTable = tables.get(key); + Pair pair = tableSchemaProjections.get(key); + Schema tableSchema = pair.getLeft(); + Schema projectedSchema = pair.getRight(); + Deserializer deserializer = + new DefaultDeserializer( + catalogTable.getSeaTunnelRowType(), projectedSchema); + + Table icebergTable = catalog.loadTable(tableConfig.getTableIdentifier()); + return new IcebergFileScanTaskSplitReader( + deserializer, + IcebergFileScanTaskReader.builder() + .fileIO(icebergTable.io()) + .tableSchema(tableSchema) + .projectedSchema(projectedSchema) + .caseSensitive(sourceConfig.isCaseSensitive()) + .reuseContainers(true) + .build()); + }); } @Override public void pollNext(Collector output) throws Exception { - for (IcebergFileScanTaskSplit pendingSplit = pendingSplits.poll(); - pendingSplit != null; - pendingSplit = pendingSplits.poll()) { - currentReadSplit = pendingSplit; - try (CloseableIterator rowIterator = - icebergFileScanTaskSplitReader.open(currentReadSplit)) { - while (rowIterator.hasNext()) { - output.collect(rowIterator.next()); + synchronized (output.getCheckpointLock()) { + currentReadSplit = pendingSplits.poll(); + if (currentReadSplit != null) { + IcebergFileScanTaskSplitReader tableReader = + getOrCreateTableReader(currentReadSplit.getTablePath()); + try (CloseableIterator rowIterator = + tableReader.open(currentReadSplit)) { + while (rowIterator.hasNext()) { + output.collect(rowIterator.next()); + } } + return; } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/split/IcebergFileScanTaskSplit.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/split/IcebergFileScanTaskSplit.java index 2b3870680d9..a476079404a 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/split/IcebergFileScanTaskSplit.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/split/IcebergFileScanTaskSplit.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.source.split; import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.iceberg.FileScanTask; @@ -36,11 +37,18 @@ public class IcebergFileScanTaskSplit implements SourceSplit { private static final long serialVersionUID = -9043797960947110643L; + private final TablePath tablePath; private final FileScanTask task; @Setter private volatile long recordOffset; + public IcebergFileScanTaskSplit(TablePath tablePath, @NonNull FileScanTask task) { + this(tablePath, task, 0); + } + + // TODO: Waiting for old version migration to complete before remove + @Deprecated public IcebergFileScanTaskSplit(@NonNull FileScanTask task) { - this(task, 0); + this(null, task, 0); } @Override diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java index 87eec5834b9..b7850ddfc7e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java @@ -147,6 +147,7 @@ private void initializeIcebergTable() { configs.put(CommonConfig.KEY_CATALOG_NAME.key(), CATALOG_NAME); configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps); + configs.put(CommonConfig.KEY_TABLE.key(), TABLE.toString()); CATALOG = new IcebergCatalogLoader(new SourceConfig(ReadonlyConfig.fromMap(configs))) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf index 351f5a58c03..fcec73e5d01 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf @@ -50,7 +50,11 @@ source { "warehouse"="file:///tmp/seatunnel/iceberg/hadoop/" } namespace = "database1" - table = "source" + table_list = [ + { + table = "source" + } + ] plugin_output = "iceberg" } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java index 27eb102866c..4aab6f5aaf1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java @@ -148,6 +148,7 @@ private void initializeIcebergTable() { configs.put(CommonConfig.KEY_CATALOG_NAME.key(), CATALOG_NAME); configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps); + configs.put(CommonConfig.KEY_TABLE.key(), TABLE.toString()); ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configs); CATALOG = new IcebergCatalogLoader(new SourceConfig(readonlyConfig)).loadCatalog(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/s3/IcebergSourceIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/s3/IcebergSourceIT.java index 35101528929..2a23708aede 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/s3/IcebergSourceIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/s3/IcebergSourceIT.java @@ -230,6 +230,7 @@ private void initializeIcebergTable() { configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps); configs.put(CommonConfig.HADOOP_PROPS.key(), getHadoopProps()); + configs.put(CommonConfig.KEY_TABLE.key(), TABLE.toString()); ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configs); CATALOG = new IcebergCatalogLoader(new SourceConfig(readonlyConfig)).loadCatalog();