Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3301] Support OSS for iceberg in InternalCatalog #3306

Merged
merged 22 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import static org.apache.amoro.properties.CatalogMetaProperties.CATALOG_TYPE_HIVE;
import static org.apache.amoro.properties.CatalogMetaProperties.KEY_WAREHOUSE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_CORE_SITE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_ENDPOINT;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_HDFS_SITE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_HIVE_SITE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_OSS_ENDPOINT;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_REGION;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_S3_ENDPOINT;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_TYPE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_HADOOP;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_OSS;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_S3;
import static org.apache.amoro.properties.CatalogMetaProperties.TABLE_FORMATS;

Expand Down Expand Up @@ -75,6 +77,7 @@
import org.apache.amoro.utils.CatalogUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aliyun.AliyunProperties;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
Expand Down Expand Up @@ -114,6 +117,10 @@ public class CatalogController {
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_S3, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_S3, MIXED_ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_OSS, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_OSS, MIXED_ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, ICEBERG));
VALIDATE_CATALOGS.add(
Expand Down Expand Up @@ -145,6 +152,14 @@ public class CatalogController {
CatalogDescriptor.of(CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_S3, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_HADOOP, STORAGE_CONFIGS_VALUE_TYPE_OSS, PAIMON));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_GLUE, STORAGE_CONFIGS_VALUE_TYPE_OSS, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_GLUE, STORAGE_CONFIGS_VALUE_TYPE_OSS, MIXED_ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_OSS, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(
CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, MIXED_ICEBERG));
Expand Down Expand Up @@ -182,11 +197,16 @@ private static Set<String> getHiddenCatalogProperties(
String.valueOf(authConfig.get(AUTH_CONFIGS_KEY_TYPE)))) {
hiddenProperties.add(S3FileIOProperties.ACCESS_KEY_ID);
hiddenProperties.add(S3FileIOProperties.SECRET_ACCESS_KEY);
hiddenProperties.add(AliyunProperties.CLIENT_ACCESS_KEY_ID);
hiddenProperties.add(AliyunProperties.CLIENT_ACCESS_KEY_SECRET);
}
if (STORAGE_CONFIGS_VALUE_TYPE_S3.equals(storageConfig.get(STORAGE_CONFIGS_KEY_TYPE))) {
hiddenProperties.add(AwsClientProperties.CLIENT_REGION);
hiddenProperties.add(S3FileIOProperties.ENDPOINT);
}
if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageConfig.get(STORAGE_CONFIGS_KEY_TYPE))) {
hiddenProperties.add(AliyunProperties.OSS_ENDPOINT);
}
return hiddenProperties;
}

Expand All @@ -205,7 +225,10 @@ public void getCatalogTypeList(Context ctx) {
}

private void fillAuthConfigs2CatalogMeta(
CatalogMeta catalogMeta, Map<String, String> serverAuthConfig, CatalogMeta oldCatalogMeta) {
CatalogMeta catalogMeta,
Map<String, String> serverAuthConfig,
CatalogMeta oldCatalogMeta,
String storageType) {
Map<String, String> metaAuthConfig = new HashMap<>();
String authType =
serverAuthConfig
Expand Down Expand Up @@ -253,19 +276,19 @@ private void fillAuthConfigs2CatalogMeta(
serverAuthConfig,
catalogMeta.getCatalogProperties(),
AUTH_CONFIGS_KEY_ACCESS_KEY,
S3FileIOProperties.ACCESS_KEY_ID);
getStorageAccessKey(storageType));
CatalogUtil.copyProperty(
serverAuthConfig,
catalogMeta.getCatalogProperties(),
AUTH_CONFIGS_KEY_SECRET_KEY,
S3FileIOProperties.SECRET_ACCESS_KEY);
getStorageAccessKey(storageType));
break;
}
catalogMeta.setAuthConfigs(metaAuthConfig);
}

private Map<String, Object> extractAuthConfigsFromCatalogMeta(
String catalogName, CatalogMeta catalogMeta) {
String catalogName, CatalogMeta catalogMeta, String storageType) {
Map<String, Object> serverAuthConfig = new HashMap<>();
Map<String, String> metaAuthConfig = catalogMeta.getAuthConfigs();
String authType =
Expand Down Expand Up @@ -298,24 +321,38 @@ private Map<String, Object> extractAuthConfigsFromCatalogMeta(
CatalogUtil.copyProperty(
catalogMeta.getCatalogProperties(),
serverAuthConfig,
S3FileIOProperties.ACCESS_KEY_ID,
getStorageAccessKey(storageType),
AUTH_CONFIGS_KEY_ACCESS_KEY);
CatalogUtil.copyProperty(
catalogMeta.getCatalogProperties(),
serverAuthConfig,
S3FileIOProperties.SECRET_ACCESS_KEY,
getStorageSecretKey(storageType),
AUTH_CONFIGS_KEY_SECRET_KEY);
break;
}

return serverAuthConfig;
}

private String getStorageAccessKey(String storageType) {
if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageType)) {
return AliyunProperties.CLIENT_ACCESS_KEY_ID;
}
// default s3
return S3FileIOProperties.SECRET_ACCESS_KEY;
}

private String getStorageSecretKey(String storageType) {
if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageType)) {
return AliyunProperties.CLIENT_ACCESS_KEY_SECRET;
}
// default s3
return S3FileIOProperties.SECRET_ACCESS_KEY;
}

private Map<String, Object> extractStorageConfigsFromCatalogMeta(
String catalogName, CatalogMeta catalogMeta) {
String catalogName, CatalogMeta catalogMeta, String storageType) {
Map<String, Object> storageConfig = new HashMap<>();
Map<String, String> config = catalogMeta.getStorageConfigs();
String storageType = CatalogUtil.getCompatibleStorageType(config);
storageConfig.put(STORAGE_CONFIGS_KEY_TYPE, storageType);
if (STORAGE_CONFIGS_VALUE_TYPE_HADOOP.equals(storageType)) {
storageConfig.put(
Expand Down Expand Up @@ -354,7 +391,13 @@ private Map<String, Object> extractStorageConfigsFromCatalogMeta(
catalogMeta.getCatalogProperties(),
storageConfig,
S3FileIOProperties.ENDPOINT,
STORAGE_CONFIGS_KEY_ENDPOINT);
STORAGE_CONFIGS_KEY_S3_ENDPOINT);
} else if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageType)) {
CatalogUtil.copyProperty(
catalogMeta.getCatalogProperties(),
storageConfig,
AliyunProperties.OSS_ENDPOINT,
STORAGE_CONFIGS_KEY_OSS_ENDPOINT);
}

return storageConfig;
Expand Down Expand Up @@ -387,12 +430,12 @@ private CatalogMeta constructCatalogMeta(CatalogRegisterInfo info, CatalogMeta o
"Invalid table format list, " + String.join(",", info.getTableFormatList()));
}
catalogMeta.getCatalogProperties().put(CatalogMetaProperties.TABLE_FORMATS, tableFormats);
fillAuthConfigs2CatalogMeta(catalogMeta, info.getAuthConfig(), oldCatalogMeta);
// change fileId to base64Code
Map<String, String> metaStorageConfig = new HashMap<>();
String storageType =
info.getStorageConfig()
.getOrDefault(STORAGE_CONFIGS_KEY_TYPE, STORAGE_CONFIGS_VALUE_TYPE_HADOOP);
fillAuthConfigs2CatalogMeta(catalogMeta, info.getAuthConfig(), oldCatalogMeta, storageType);
// change fileId to base64Code
Map<String, String> metaStorageConfig = new HashMap<>();
metaStorageConfig.put(STORAGE_CONFIGS_KEY_TYPE, storageType);
if (storageType.equals(STORAGE_CONFIGS_VALUE_TYPE_HADOOP)) {
List<String> metaKeyList =
Expand Down Expand Up @@ -429,8 +472,14 @@ private CatalogMeta constructCatalogMeta(CatalogRegisterInfo info, CatalogMeta o
CatalogUtil.copyProperty(
info.getStorageConfig(),
catalogMeta.getCatalogProperties(),
STORAGE_CONFIGS_KEY_ENDPOINT,
STORAGE_CONFIGS_KEY_S3_ENDPOINT,
S3FileIOProperties.ENDPOINT);
} else if (storageType.equals(STORAGE_CONFIGS_VALUE_TYPE_OSS)) {
CatalogUtil.copyProperty(
info.getStorageConfig(),
catalogMeta.getCatalogProperties(),
STORAGE_CONFIGS_KEY_OSS_ENDPOINT,
AliyunProperties.OSS_ENDPOINT);
} else {
throw new RuntimeException("Invalid storage type " + storageType);
}
Expand Down Expand Up @@ -541,8 +590,10 @@ public void getCatalogDetail(Context ctx) {
} else {
info.setType(catalogMeta.getCatalogType());
}
info.setAuthConfig(extractAuthConfigsFromCatalogMeta(catalogName, catalogMeta));
info.setStorageConfig(extractStorageConfigsFromCatalogMeta(catalogName, catalogMeta));
String storageType = CatalogUtil.getCompatibleStorageType(catalogMeta.getStorageConfigs());
info.setAuthConfig(extractAuthConfigsFromCatalogMeta(catalogName, catalogMeta, storageType));
info.setStorageConfig(
extractStorageConfigsFromCatalogMeta(catalogName, catalogMeta, storageType));
// we put the table format single
String tableFormat =
catalogMeta.getCatalogProperties().get(CatalogMetaProperties.TABLE_FORMATS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class InternalTableConstants {
public static final String HADOOP_FILE_IO_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
public static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO";
public static final String S3_PROTOCOL_PREFIX = "s3://";
public static final String OSS_FILE_IO_IMPL = "org.apache.iceberg.aliyun.oss.OSSFileIO";
public static final String OSS_PROTOCOL_PREFIX = "oss://";

public static final String CHANGE_STORE_TABLE_NAME_SUFFIX =
InternalMixedIcebergCatalog.CHANGE_STORE_SEPARATOR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.apache.amoro.server.table.internal.InternalTableConstants.HADOOP_FILE_IO_IMPL;
import static org.apache.amoro.server.table.internal.InternalTableConstants.METADATA_FOLDER_NAME;
import static org.apache.amoro.server.table.internal.InternalTableConstants.MIXED_ICEBERG_BASED_REST;
import static org.apache.amoro.server.table.internal.InternalTableConstants.OSS_FILE_IO_IMPL;
import static org.apache.amoro.server.table.internal.InternalTableConstants.OSS_PROTOCOL_PREFIX;
import static org.apache.amoro.server.table.internal.InternalTableConstants.S3_FILE_IO_IMPL;
import static org.apache.amoro.server.table.internal.InternalTableConstants.S3_PROTOCOL_PREFIX;

Expand Down Expand Up @@ -88,6 +90,8 @@ public static AuthenticatedFileIO newIcebergFileIo(CatalogMeta meta) {
String defaultImpl = HADOOP_FILE_IO_IMPL;
if (warehouse.toLowerCase().startsWith(S3_PROTOCOL_PREFIX)) {
defaultImpl = S3_FILE_IO_IMPL;
} else if (warehouse.toLowerCase().startsWith(OSS_PROTOCOL_PREFIX)) {
defaultImpl = OSS_FILE_IO_IMPL;
}
String ioImpl = catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL, defaultImpl);
FileIO fileIO = org.apache.iceberg.CatalogUtil.loadFileIO(ioImpl, catalogProperties, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ public class CatalogMetaProperties {
public static final String STORAGE_CONFIGS_KEY_CORE_SITE = "hadoop.core.site";
public static final String STORAGE_CONFIGS_KEY_HIVE_SITE = "hive.site";
public static final String STORAGE_CONFIGS_KEY_REGION = "storage.s3.region";
public static final String STORAGE_CONFIGS_KEY_ENDPOINT = "storage.s3.endpoint";
public static final String STORAGE_CONFIGS_KEY_S3_ENDPOINT = "storage.s3.endpoint";
public static final String STORAGE_CONFIGS_KEY_OSS_ENDPOINT = "storage.oss.endpoint";

public static final String STORAGE_CONFIGS_VALUE_TYPE_HDFS_LEGACY = "hdfs";
public static final String STORAGE_CONFIGS_VALUE_TYPE_HADOOP = "Hadoop";
public static final String STORAGE_CONFIGS_VALUE_TYPE_S3 = "S3";
public static final String STORAGE_CONFIGS_VALUE_TYPE_OSS = "OSS";

public static final String AUTH_CONFIGS_KEY_TYPE = "auth.type";
public static final String AUTH_CONFIGS_KEY_PRINCIPAL = "auth.kerberos.principal";
Expand Down
5 changes: 5 additions & 0 deletions amoro-format-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@
<artifactId>iceberg-aws</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aliyun</artifactId>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
<include>org.apache.iceberg:iceberg-orc</include>
<include>org.apache.iceberg:iceberg-parquet</include>
<include>org.apache.iceberg:iceberg-aws</include>
<include>org.apache.iceberg:iceberg-aliyun</include>
<include>org.apache.parquet:parquet-column</include>
<include>org.apache.parquet:parquet-hadoop</include>
<include>org.apache.parquet:parquet-common</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
<include>org.apache.iceberg:iceberg-orc</include>
<include>org.apache.iceberg:iceberg-parquet</include>
<include>org.apache.iceberg:iceberg-aws</include>
<include>org.apache.iceberg:iceberg-aliyun</include>
<include>org.apache.parquet:parquet-column</include>
<include>org.apache.parquet:parquet-hadoop</include>
<include>org.apache.parquet:parquet-common</include>
Expand Down
39 changes: 34 additions & 5 deletions amoro-web/src/views/catalogs/Detail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ const s3ConfigTypeOps = reactive<ILableAndValue[]>([{
value: 'CUSTOM',
}])

const ossConfigTypeOps = reactive<ILableAndValue[]>([{
label: 'AK/SK',
value: 'AK/SK',
}, {
label: 'CUSTOM',
value: 'CUSTOM',
}])

const storageConfigMap = {
'hadoop.core.site': 'Hadoop core-site',
'hadoop.hdfs.site': 'Hadoop hdfs-site',
Expand Down Expand Up @@ -361,37 +369,48 @@ async function changeProperties() {
formState.properties = properties
}

const storageConfigTypeS3 = reactive<ILableAndValue[]>([{
const storageConfigTypeS3Oss = reactive<ILableAndValue[]>([{
label: 'S3',
value: 'S3',
}, {
label: 'OSS',
value: 'OSS',
}])

const storageConfigTypeOSS = reactive<ILableAndValue[]>([{
label: 'OSS',
value: 'OSS',
}])

const storageConfigTypeHadoop = reactive<ILableAndValue[]>([{
label: 'Hadoop',
value: 'Hadoop',
}])

const storageConfigTypeHadoopS3 = reactive<ILableAndValue[]>([{
const storageConfigTypeHadoopS3Oss = reactive<ILableAndValue[]>([{
label: 'Hadoop',
value: 'Hadoop',
}, {
label: 'S3',
value: 'S3',
}, {
label: 'OSS',
value: 'OSS',
}])

const storageConfigTypeOps = computed(() => {
const type = formState.catalog.type
if (type === 'ams' || type === 'custom') {
return storageConfigTypeHadoopS3
return storageConfigTypeHadoopS3Oss
}
else if (type === 'glue') {
return storageConfigTypeS3
return storageConfigTypeS3Oss
}
else if (type === 'hive') {
return storageConfigTypeHadoop
}
else if (type === 'hadoop') {
return storageConfigTypeHadoopS3
return storageConfigTypeHadoopS3Oss
}
else {
return null
Expand All @@ -406,6 +425,9 @@ const authTypeOptions = computed(() => {
else if (type === 'S3') {
return s3ConfigTypeOps
}
else if (type === 'OSS') {
return ossConfigTypeOps
}

return null
})
Expand Down Expand Up @@ -672,6 +694,13 @@ onMounted(() => {
<a-input v-if="isEdit" v-model:value="formState.storageConfig['storage.s3.region']" />
<span v-else class="config-value">{{ formState.storageConfig['storage.s3.region'] }}</span>
</a-form-item>
<a-form-item
v-if="formState.storageConfig['storage.type'] === 'OSS'" label="Endpoint"
:name="['storageConfig', 'storage.oss.endpoint']" :rules="[{ required: false }]"
>
<a-input v-if="isEdit" v-model:value="formState.storageConfig['storage.oss.endpoint']" />
<span v-else class="config-value">{{ formState.storageConfig['storage.oss.endpoint'] }}</span>
</a-form-item>
<div v-if="formState.storageConfig['storage.type'] === 'Hadoop'">
<a-form-item
v-for="config in formState.storageConfigArray" :key="config.label" :label="config.label"
Expand Down
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
<mysql-jdbc.version>8.0.33</mysql-jdbc.version>
<orc-core.version>1.8.3</orc-core.version>
<awssdk.version>2.24.12</awssdk.version>
<aliyun-sdk-oss.version>3.10.2</aliyun-sdk-oss.version>
<terminal.spark.version>3.3.2</terminal.spark.version>
<terminal.spark.major.version>3.3</terminal.spark.major.version>
<dropwizard.metrics.version>4.2.19</dropwizard.metrics.version>
Expand Down Expand Up @@ -350,6 +351,18 @@
<version>${iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aliyun</artifactId>
<version>${iceberg.version}</version>
</dependency>

<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>${aliyun-sdk-oss.version}</version>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
Expand Down
Loading