Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3301] Support OSS for iceberg in InternalCatalog #3306

Merged
merged 22 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ Amoro is built using Maven with JDK 8 and JDK 17(only for `amoro-format-mixed/am
* Build and skip tests: `mvn clean package -DskipTests`
* Build and skip dashboard: `mvn clean package -Pskip-dashboard-build`
* Build and disable disk storage, RocksDB will NOT be introduced to avoid memory overflow: `mvn clean package -DskipTests -Pno-extented-disk-storage`
* Build and enable aliyun-oss-sdk: `mvn clean package -DskipTests -Paliyun-oss-sdk`
* Build with hadoop 2.x(the default is 3.x) dependencies: `mvn clean package -DskipTests -Phadoop2`
* Specify Flink version for Flink optimizer(the default is 1.20.0): `mvn clean package -DskipTests -Dflink-optimizer.flink-version=1.20.0`
* If the version of Flink is below 1.15.0, you also need to add the `-Pflink-optimizer-pre-1.15` parameter: `mvn clean package -DskipTests -Pflink-optimizer-pre-1.15 -Dflink-optimizer.flink-version=1.14.6`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import static org.apache.amoro.properties.CatalogMetaProperties.CATALOG_TYPE_HIVE;
import static org.apache.amoro.properties.CatalogMetaProperties.KEY_WAREHOUSE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_CORE_SITE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_ENDPOINT;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_HDFS_SITE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_HIVE_SITE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_OSS_ENDPOINT;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_REGION;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_S3_ENDPOINT;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_TYPE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_HADOOP;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_OSS;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_S3;
import static org.apache.amoro.properties.CatalogMetaProperties.TABLE_FORMATS;

Expand Down Expand Up @@ -75,6 +77,7 @@
import org.apache.amoro.utils.CatalogUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aliyun.AliyunProperties;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
Expand Down Expand Up @@ -114,6 +117,10 @@ public class CatalogController {
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_S3, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_S3, MIXED_ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_OSS, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_OSS, MIXED_ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, ICEBERG));
VALIDATE_CATALOGS.add(
Expand Down Expand Up @@ -145,6 +152,14 @@ public class CatalogController {
CatalogDescriptor.of(CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_S3, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_HADOOP, STORAGE_CONFIGS_VALUE_TYPE_OSS, PAIMON));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_GLUE, STORAGE_CONFIGS_VALUE_TYPE_OSS, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_GLUE, STORAGE_CONFIGS_VALUE_TYPE_OSS, MIXED_ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_OSS, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(
CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, MIXED_ICEBERG));
Expand Down Expand Up @@ -182,11 +197,16 @@ private static Set<String> getHiddenCatalogProperties(
String.valueOf(authConfig.get(AUTH_CONFIGS_KEY_TYPE)))) {
hiddenProperties.add(S3FileIOProperties.ACCESS_KEY_ID);
hiddenProperties.add(S3FileIOProperties.SECRET_ACCESS_KEY);
hiddenProperties.add(AliyunProperties.CLIENT_ACCESS_KEY_ID);
hiddenProperties.add(AliyunProperties.CLIENT_ACCESS_KEY_SECRET);
}
if (STORAGE_CONFIGS_VALUE_TYPE_S3.equals(storageConfig.get(STORAGE_CONFIGS_KEY_TYPE))) {
hiddenProperties.add(AwsClientProperties.CLIENT_REGION);
hiddenProperties.add(S3FileIOProperties.ENDPOINT);
}
if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageConfig.get(STORAGE_CONFIGS_KEY_TYPE))) {
hiddenProperties.add(AliyunProperties.OSS_ENDPOINT);
}
return hiddenProperties;
}

Expand All @@ -205,7 +225,10 @@ public void getCatalogTypeList(Context ctx) {
}

private void fillAuthConfigs2CatalogMeta(
CatalogMeta catalogMeta, Map<String, String> serverAuthConfig, CatalogMeta oldCatalogMeta) {
CatalogMeta catalogMeta,
Map<String, String> serverAuthConfig,
CatalogMeta oldCatalogMeta,
String storageType) {
Map<String, String> metaAuthConfig = new HashMap<>();
String authType =
serverAuthConfig
Expand Down Expand Up @@ -253,19 +276,19 @@ private void fillAuthConfigs2CatalogMeta(
serverAuthConfig,
catalogMeta.getCatalogProperties(),
AUTH_CONFIGS_KEY_ACCESS_KEY,
S3FileIOProperties.ACCESS_KEY_ID);
getStorageAccessKey(storageType));
CatalogUtil.copyProperty(
serverAuthConfig,
catalogMeta.getCatalogProperties(),
AUTH_CONFIGS_KEY_SECRET_KEY,
S3FileIOProperties.SECRET_ACCESS_KEY);
getStorageSecretKey(storageType));
break;
}
catalogMeta.setAuthConfigs(metaAuthConfig);
}

private Map<String, Object> extractAuthConfigsFromCatalogMeta(
String catalogName, CatalogMeta catalogMeta) {
String catalogName, CatalogMeta catalogMeta, String storageType) {
Map<String, Object> serverAuthConfig = new HashMap<>();
Map<String, String> metaAuthConfig = catalogMeta.getAuthConfigs();
String authType =
Expand Down Expand Up @@ -298,24 +321,38 @@ private Map<String, Object> extractAuthConfigsFromCatalogMeta(
CatalogUtil.copyProperty(
catalogMeta.getCatalogProperties(),
serverAuthConfig,
S3FileIOProperties.ACCESS_KEY_ID,
getStorageAccessKey(storageType),
AUTH_CONFIGS_KEY_ACCESS_KEY);
CatalogUtil.copyProperty(
catalogMeta.getCatalogProperties(),
serverAuthConfig,
S3FileIOProperties.SECRET_ACCESS_KEY,
getStorageSecretKey(storageType),
AUTH_CONFIGS_KEY_SECRET_KEY);
break;
}

return serverAuthConfig;
}

private String getStorageAccessKey(String storageType) {
if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageType)) {
return AliyunProperties.CLIENT_ACCESS_KEY_ID;
}
// default s3
return S3FileIOProperties.ACCESS_KEY_ID;
}

private String getStorageSecretKey(String storageType) {
if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageType)) {
return AliyunProperties.CLIENT_ACCESS_KEY_SECRET;
}
// default s3
return S3FileIOProperties.SECRET_ACCESS_KEY;
}

private Map<String, Object> extractStorageConfigsFromCatalogMeta(
String catalogName, CatalogMeta catalogMeta) {
String catalogName, CatalogMeta catalogMeta, String storageType) {
Map<String, Object> storageConfig = new HashMap<>();
Map<String, String> config = catalogMeta.getStorageConfigs();
String storageType = CatalogUtil.getCompatibleStorageType(config);
storageConfig.put(STORAGE_CONFIGS_KEY_TYPE, storageType);
if (STORAGE_CONFIGS_VALUE_TYPE_HADOOP.equals(storageType)) {
storageConfig.put(
Expand Down Expand Up @@ -354,7 +391,13 @@ private Map<String, Object> extractStorageConfigsFromCatalogMeta(
catalogMeta.getCatalogProperties(),
storageConfig,
S3FileIOProperties.ENDPOINT,
STORAGE_CONFIGS_KEY_ENDPOINT);
STORAGE_CONFIGS_KEY_S3_ENDPOINT);
} else if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageType)) {
CatalogUtil.copyProperty(
catalogMeta.getCatalogProperties(),
storageConfig,
AliyunProperties.OSS_ENDPOINT,
STORAGE_CONFIGS_KEY_OSS_ENDPOINT);
}

return storageConfig;
Expand Down Expand Up @@ -387,12 +430,12 @@ private CatalogMeta constructCatalogMeta(CatalogRegisterInfo info, CatalogMeta o
"Invalid table format list, " + String.join(",", info.getTableFormatList()));
}
catalogMeta.getCatalogProperties().put(CatalogMetaProperties.TABLE_FORMATS, tableFormats);
fillAuthConfigs2CatalogMeta(catalogMeta, info.getAuthConfig(), oldCatalogMeta);
// change fileId to base64Code
Map<String, String> metaStorageConfig = new HashMap<>();
String storageType =
info.getStorageConfig()
.getOrDefault(STORAGE_CONFIGS_KEY_TYPE, STORAGE_CONFIGS_VALUE_TYPE_HADOOP);
fillAuthConfigs2CatalogMeta(catalogMeta, info.getAuthConfig(), oldCatalogMeta, storageType);
// change fileId to base64Code
Map<String, String> metaStorageConfig = new HashMap<>();
metaStorageConfig.put(STORAGE_CONFIGS_KEY_TYPE, storageType);
if (storageType.equals(STORAGE_CONFIGS_VALUE_TYPE_HADOOP)) {
List<String> metaKeyList =
Expand Down Expand Up @@ -429,8 +472,14 @@ private CatalogMeta constructCatalogMeta(CatalogRegisterInfo info, CatalogMeta o
CatalogUtil.copyProperty(
info.getStorageConfig(),
catalogMeta.getCatalogProperties(),
STORAGE_CONFIGS_KEY_ENDPOINT,
STORAGE_CONFIGS_KEY_S3_ENDPOINT,
S3FileIOProperties.ENDPOINT);
} else if (storageType.equals(STORAGE_CONFIGS_VALUE_TYPE_OSS)) {
CatalogUtil.copyProperty(
info.getStorageConfig(),
catalogMeta.getCatalogProperties(),
STORAGE_CONFIGS_KEY_OSS_ENDPOINT,
AliyunProperties.OSS_ENDPOINT);
} else {
throw new RuntimeException("Invalid storage type " + storageType);
}
Expand Down Expand Up @@ -541,8 +590,10 @@ public void getCatalogDetail(Context ctx) {
} else {
info.setType(catalogMeta.getCatalogType());
}
info.setAuthConfig(extractAuthConfigsFromCatalogMeta(catalogName, catalogMeta));
info.setStorageConfig(extractStorageConfigsFromCatalogMeta(catalogName, catalogMeta));
String storageType = CatalogUtil.getCompatibleStorageType(catalogMeta.getStorageConfigs());
info.setAuthConfig(extractAuthConfigsFromCatalogMeta(catalogName, catalogMeta, storageType));
info.setStorageConfig(
extractStorageConfigsFromCatalogMeta(catalogName, catalogMeta, storageType));
// we put the table format single
String tableFormat =
catalogMeta.getCatalogProperties().get(CatalogMetaProperties.TABLE_FORMATS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class InternalTableConstants {
public static final String HADOOP_FILE_IO_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
public static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO";
public static final String S3_PROTOCOL_PREFIX = "s3://";
public static final String OSS_FILE_IO_IMPL = "org.apache.iceberg.aliyun.oss.OSSFileIO";
public static final String OSS_PROTOCOL_PREFIX = "oss://";

public static final String CHANGE_STORE_TABLE_NAME_SUFFIX =
InternalMixedIcebergCatalog.CHANGE_STORE_SEPARATOR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.apache.amoro.server.table.internal.InternalTableConstants.HADOOP_FILE_IO_IMPL;
import static org.apache.amoro.server.table.internal.InternalTableConstants.METADATA_FOLDER_NAME;
import static org.apache.amoro.server.table.internal.InternalTableConstants.MIXED_ICEBERG_BASED_REST;
import static org.apache.amoro.server.table.internal.InternalTableConstants.OSS_FILE_IO_IMPL;
import static org.apache.amoro.server.table.internal.InternalTableConstants.OSS_PROTOCOL_PREFIX;
import static org.apache.amoro.server.table.internal.InternalTableConstants.S3_FILE_IO_IMPL;
import static org.apache.amoro.server.table.internal.InternalTableConstants.S3_PROTOCOL_PREFIX;

Expand Down Expand Up @@ -88,6 +90,8 @@ public static AuthenticatedFileIO newIcebergFileIo(CatalogMeta meta) {
String defaultImpl = HADOOP_FILE_IO_IMPL;
if (warehouse.toLowerCase().startsWith(S3_PROTOCOL_PREFIX)) {
defaultImpl = S3_FILE_IO_IMPL;
} else if (warehouse.toLowerCase().startsWith(OSS_PROTOCOL_PREFIX)) {
defaultImpl = OSS_FILE_IO_IMPL;
}
String ioImpl = catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL, defaultImpl);
FileIO fileIO = org.apache.iceberg.CatalogUtil.loadFileIO(ioImpl, catalogProperties, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ public class CatalogMetaProperties {
public static final String STORAGE_CONFIGS_KEY_CORE_SITE = "hadoop.core.site";
public static final String STORAGE_CONFIGS_KEY_HIVE_SITE = "hive.site";
public static final String STORAGE_CONFIGS_KEY_REGION = "storage.s3.region";
public static final String STORAGE_CONFIGS_KEY_ENDPOINT = "storage.s3.endpoint";
public static final String STORAGE_CONFIGS_KEY_S3_ENDPOINT = "storage.s3.endpoint";
public static final String STORAGE_CONFIGS_KEY_OSS_ENDPOINT = "storage.oss.endpoint";

public static final String STORAGE_CONFIGS_VALUE_TYPE_HDFS_LEGACY = "hdfs";
public static final String STORAGE_CONFIGS_VALUE_TYPE_HADOOP = "Hadoop";
public static final String STORAGE_CONFIGS_VALUE_TYPE_S3 = "S3";
public static final String STORAGE_CONFIGS_VALUE_TYPE_OSS = "OSS";

public static final String AUTH_CONFIGS_KEY_TYPE = "auth.type";
public static final String AUTH_CONFIGS_KEY_PRINCIPAL = "auth.kerberos.principal";
Expand Down
18 changes: 17 additions & 1 deletion amoro-format-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@
<artifactId>iceberg-aws</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aliyun</artifactId>
</dependency>

<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<exclusions>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
Expand Down Expand Up @@ -163,7 +179,7 @@
<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-common</artifactId>
<version>${parent.version}</version>
<version>${project.parent.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
<include>org.apache.iceberg:iceberg-orc</include>
<include>org.apache.iceberg:iceberg-parquet</include>
<include>org.apache.iceberg:iceberg-aws</include>
<include>org.apache.iceberg:iceberg-aliyun</include>
<include>org.apache.parquet:parquet-column</include>
<include>org.apache.parquet:parquet-hadoop</include>
<include>org.apache.parquet:parquet-common</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@
<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-format-paimon</artifactId>
<version>${parent.version}</version>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
<include>org.apache.iceberg:iceberg-orc</include>
<include>org.apache.iceberg:iceberg-parquet</include>
<include>org.apache.iceberg:iceberg-aws</include>
<include>org.apache.iceberg:iceberg-aliyun</include>
<include>org.apache.parquet:parquet-column</include>
<include>org.apache.parquet:parquet-hadoop</include>
<include>org.apache.parquet:parquet-common</include>
Expand Down
Loading
Loading