Skip to content

Commit

Permalink
[Feature][Iceberg] Support read multi-table (#8524)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Jan 21, 2025
1 parent 021af14 commit 2bfb97e
Show file tree
Hide file tree
Showing 20 changed files with 665 additions and 329 deletions.
50 changes: 29 additions & 21 deletions docs/en/connector-v2/source/Iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ libfb303-xxx.jar

## Source Options

| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|--------------------------|---------|----------|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| catalog_name | string | yes | - | User-specified catalog name. |
| namespace | string | yes | - | The iceberg database name in the backend catalog. |
| table | string | yes | - | The iceberg table name in the backend catalog. |
| table | string | no | - | The iceberg table name in the backend catalog. |
| table_list | string | no | - | The iceberg table list in the backend catalog. |
| iceberg.catalog.config | map | yes | - | Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java" |
| hadoop.config | map | no | - | Properties passed through to the Hadoop configuration |
| iceberg.hadoop-conf-path | string | no | - | The specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files. |
Expand All @@ -87,6 +88,7 @@ libfb303-xxx.jar
| use_snapshot_id | long | no | - | Instructs this scan to look for use the given snapshot ID. |
| use_snapshot_timestamp | long | no | - | Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch |
| stream_scan_strategy | enum | no | FROM_LATEST_SNAPSHOT | Starting strategy for stream mode execution, Default to use `FROM_LATEST_SNAPSHOT` if don’t specify any value,The optional values are:<br/>TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.<br/>FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.<br/>FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.<br/>FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.<br/>FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. |
| increment.scan-interval | long | no | 2000 | The interval of increment scan(mills) |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |

## Task Example
Expand All @@ -101,25 +103,6 @@ env {
source {
Iceberg {
schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
f5 = "float"
f6 = "double"
f7 = "date"
f9 = "timestamp"
f10 = "timestamp"
f11 = "string"
f12 = "bytes"
f13 = "bytes"
f14 = "decimal(19,9)"
f15 = "array<int>"
f16 = "map<string, int>"
}
}
catalog_name = "seatunnel"
iceberg.catalog.config={
type = "hadoop"
Expand All @@ -141,6 +124,31 @@ sink {
}
```

### Multi-Table Read:

```hocon
source {
Iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config = {
type = "hadoop"
warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
}
namespace = "database1"
table_list = [
{
table = "table_1
},
{
table = "table_2
}
]
plugin_output = "iceberg"
}
}
```

### Hadoop S3 Catalog:

```hocon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@

package org.apache.seatunnel.connectors.seatunnel.iceberg.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergStreamScanStrategy;

import org.apache.iceberg.expressions.Expression;

import lombok.Getter;
import lombok.ToString;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergStreamScanStrategy.FROM_LATEST_SNAPSHOT;

@Getter
Expand Down Expand Up @@ -74,45 +75,53 @@ public class SourceConfig extends CommonConfig {
.defaultValue(FROM_LATEST_SNAPSHOT)
.withDescription(" the iceberg strategy of stream scanning");

private Long startSnapshotTimestamp;
private Long startSnapshotId;
private Long endSnapshotId;
public static final Option<List<SourceTableConfig>> KEY_TABLE_LIST =
Options.key("table_list")
.listType(SourceTableConfig.class)
.noDefaultValue()
.withDescription(" the iceberg tables");

private Long useSnapshotId;
private Long useSnapshotTimestamp;
public static final Option<Long> KEY_INCREMENT_SCAN_INTERVAL =
Options.key("increment.scan-interval")
.longType()
.defaultValue(2000L)
.withDescription(" the interval of increment scan(mills)");

private IcebergStreamScanStrategy streamScanStrategy = KEY_STREAM_SCAN_STRATEGY.defaultValue();
private Expression filter;
private Long splitSize;
private Integer splitLookback;
private Long splitOpenFileCost;
private long incrementScanInterval;
private List<SourceTableConfig> tableList;

public SourceConfig(ReadonlyConfig readonlyConfig) {
super(readonlyConfig);
Config pluginConfig = readonlyConfig.toConfig();
if (pluginConfig.hasPath(KEY_START_SNAPSHOT_TIMESTAMP.key())) {
this.startSnapshotTimestamp = pluginConfig.getLong(KEY_START_SNAPSHOT_TIMESTAMP.key());
}
if (pluginConfig.hasPath(KEY_START_SNAPSHOT_ID.key())) {
this.startSnapshotId = pluginConfig.getLong(KEY_START_SNAPSHOT_ID.key());
}
if (pluginConfig.hasPath(KEY_END_SNAPSHOT_ID.key())) {
this.endSnapshotId = pluginConfig.getLong(KEY_END_SNAPSHOT_ID.key());
}
if (pluginConfig.hasPath(KEY_USE_SNAPSHOT_ID.key())) {
this.useSnapshotId = pluginConfig.getLong(KEY_USE_SNAPSHOT_ID.key());
}
if (pluginConfig.hasPath(KEY_USE_SNAPSHOT_TIMESTAMP.key())) {
this.useSnapshotTimestamp = pluginConfig.getLong(KEY_USE_SNAPSHOT_TIMESTAMP.key());
}
if (pluginConfig.hasPath(KEY_STREAM_SCAN_STRATEGY.key())) {
this.streamScanStrategy =
pluginConfig.getEnum(
IcebergStreamScanStrategy.class, KEY_STREAM_SCAN_STRATEGY.key());
this.incrementScanInterval = readonlyConfig.get(KEY_INCREMENT_SCAN_INTERVAL);
if (this.getTable() != null) {
SourceTableConfig tableConfig =
SourceTableConfig.builder()
.namespace(this.getNamespace())
.table(this.getTable())
.startSnapshotTimestamp(
readonlyConfig.get(KEY_START_SNAPSHOT_TIMESTAMP))
.startSnapshotId(readonlyConfig.get(KEY_START_SNAPSHOT_ID))
.endSnapshotId(readonlyConfig.get(KEY_END_SNAPSHOT_ID))
.useSnapshotId(readonlyConfig.get(KEY_USE_SNAPSHOT_ID))
.useSnapshotTimestamp(readonlyConfig.get(KEY_USE_SNAPSHOT_TIMESTAMP))
.streamScanStrategy(readonlyConfig.get(KEY_STREAM_SCAN_STRATEGY))
.build();
this.tableList = Collections.singletonList(tableConfig);
} else {
this.tableList =
readonlyConfig.get(KEY_TABLE_LIST).stream()
.map(
tableConfig ->
tableConfig.setNamespace(
SourceConfig.this.getNamespace()))
.collect(Collectors.toList());
}
}

public static SourceConfig loadConfig(ReadonlyConfig pluginConfig) {
return new SourceConfig(pluginConfig);
public SourceTableConfig getTableConfig(TablePath tablePath) {
return tableList.stream()
.filter(tableConfig -> tableConfig.getTablePath().equals(tablePath))
.findFirst()
.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.config;

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.IcebergStreamScanStrategy;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.experimental.Tolerate;

import java.io.Serializable;

import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig.KEY_STREAM_SCAN_STRATEGY;

@AllArgsConstructor
@Data
@Builder
public class SourceTableConfig implements Serializable {
private String namespace;
private String table;

private Long startSnapshotTimestamp;
private Long startSnapshotId;
private Long endSnapshotId;

private Long useSnapshotId;
private Long useSnapshotTimestamp;

private IcebergStreamScanStrategy streamScanStrategy = KEY_STREAM_SCAN_STRATEGY.defaultValue();
private Expression filter;
private Long splitSize;
private Integer splitLookback;
private Long splitOpenFileCost;

@Tolerate
public SourceTableConfig() {}

public TablePath getTablePath() {
String[] paths = table.split("\\.");
if (paths.length == 1) {
return TablePath.of(namespace, table);
}
if (paths.length == 2) {
return TablePath.of(paths[0], paths[1]);
}
String namespace = table.substring(0, table.lastIndexOf("\\."));
return TablePath.of(namespace, table);
}

public TableIdentifier getTableIdentifier() {
return SchemaUtils.toIcebergTableIdentifier(getTablePath());
}

public SourceTableConfig setNamespace(String namespace) {
this.namespace = namespace;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void close() throws IOException {
if (writer != null) {
writer.close();
}
icebergTableLoader.close();
} finally {
results.clear();
}
Expand Down
Loading

0 comments on commit 2bfb97e

Please sign in to comment.