From a1a12196e98dac3cd12c6ca60c46563f19c69ab1 Mon Sep 17 00:00:00 2001 From: ZhouJinsong Date: Fri, 6 Sep 2024 10:24:28 +0800 Subject: [PATCH 1/6] [AMORO-3176] Support multiple format in internal catalog (#3177) Support multiple format in internal catalog --- .../amoro/server/RestCatalogService.java | 5 +- .../amoro/server/catalog/CatalogBuilder.java | 13 +- .../server/catalog/InternalCatalogImpl.java | 260 ++++++++++++++++++ .../catalog/InternalIcebergCatalogImpl.java | 169 ------------ .../catalog/InternalMixedCatalogImpl.java | 164 ----------- .../server/catalog/MixedHiveCatalogImpl.java | 137 --------- .../server/terminal/TerminalManager.java | 14 +- .../mixed/MixedIcebergCatalogFactory.java | 9 +- .../amoro/mixed/BasicMixedIcebergCatalog.java | 11 +- .../amoro/hive/catalog/MixedHiveCatalog.java | 2 +- .../spark/mixed/MixedSparkCatalogBase.java | 6 +- 11 files changed, 292 insertions(+), 498 deletions(-) create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/catalog/MixedHiveCatalogImpl.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java index 2a08824137..5c37d6ca8a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java @@ -437,9 +437,8 @@ private InternalCatalog getCatalog(String catalog) { internalCatalog instanceof InternalCatalog, "The catalog is not an iceberg rest catalog"); Set tableFormats = CatalogUtil.tableFormats(internalCatalog.getMetadata()); Preconditions.checkArgument( - tableFormats.size() == 1 - && (tableFormats.contains(TableFormat.ICEBERG) - || tableFormats.contains(TableFormat.MIXED_ICEBERG)), + tableFormats.contains(TableFormat.ICEBERG) + || tableFormats.contains(TableFormat.MIXED_ICEBERG), "The catalog is not an iceberg rest catalog"); return (InternalCatalog) internalCatalog; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java index a2e1b27b5b..40d659ecfa 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java @@ -74,11 +74,10 @@ public static ServerCatalog buildServerCatalog( formatSupportedMatrix.containsKey(type), "unsupported catalog type: %s", type); Set supportedFormats = formatSupportedMatrix.get(type); - TableFormat tableFormat = tableFormats.iterator().next(); Preconditions.checkState( - supportedFormats.contains(tableFormat), + supportedFormats.containsAll(tableFormats), "Table format %s is not supported for metastore type: %s", - tableFormat, + tableFormats, type); switch (type) { @@ -91,13 +90,7 @@ public static ServerCatalog buildServerCatalog( catalogMeta.getCatalogProperties().put(CatalogMetaProperties.AMS_URI, amsUri); return new ExternalCatalog(catalogMeta); case CATALOG_TYPE_AMS: - if (tableFormat.equals(TableFormat.MIXED_ICEBERG)) { - return new InternalMixedCatalogImpl(catalogMeta, serverConfiguration); - } else if (tableFormat.equals(TableFormat.ICEBERG)) { - return new InternalIcebergCatalogImpl(catalogMeta, serverConfiguration); - } else { - throw new IllegalStateException("AMS catalog support iceberg/mixed-iceberg table only."); - } + return new InternalCatalogImpl(catalogMeta, serverConfiguration); default: throw new IllegalStateException("unsupported catalog type:" + type); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java new file mode 100644 index 0000000000..b812c61da2 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.catalog; + +import static org.apache.amoro.server.table.internal.InternalTableConstants.CHANGE_STORE_TABLE_NAME_SUFFIX; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import org.apache.amoro.AmoroTable; +import org.apache.amoro.TableFormat; +import org.apache.amoro.api.CatalogMeta; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.formats.iceberg.IcebergTable; +import org.apache.amoro.io.AuthenticatedFileIO; +import org.apache.amoro.mixed.InternalMixedIcebergCatalog; +import org.apache.amoro.server.AmoroManagementConf; +import org.apache.amoro.server.RestCatalogService; +import org.apache.amoro.server.exception.ObjectNotExistsException; +import org.apache.amoro.server.table.TableMetadata; +import org.apache.amoro.server.table.internal.InternalIcebergCreator; +import org.apache.amoro.server.table.internal.InternalIcebergHandler; +import org.apache.amoro.server.table.internal.InternalMixedIcebergCreator; +import org.apache.amoro.server.table.internal.InternalMixedIcebergHandler; +import org.apache.amoro.server.table.internal.InternalTableCreator; +import org.apache.amoro.server.table.internal.InternalTableHandler; +import org.apache.amoro.server.utils.InternalTableUtil; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.table.BasicKeyedTable; +import org.apache.amoro.table.BasicUnkeyedTable; +import org.apache.amoro.table.MixedTable; +import org.apache.amoro.table.PrimaryKeySpec; +import org.apache.amoro.utils.CatalogUtil; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.rest.requests.CreateTableRequest; + +public class InternalCatalogImpl extends InternalCatalog { + + private static final String URI = "uri"; + + final int httpPort; + final String exposedHost; + + final Cache, FileIO> fileIOCloser; + + protected InternalCatalogImpl(CatalogMeta metadata, Configurations serverConfiguration) { + super(metadata); + this.httpPort = serverConfiguration.getInteger(AmoroManagementConf.HTTP_SERVER_PORT); + this.exposedHost = serverConfiguration.getString(AmoroManagementConf.SERVER_EXPOSE_HOST); + this.fileIOCloser = newFileIOCloser(); + } + + @Override + public CatalogMeta getMetadata() { + CatalogMeta meta = super.getMetadata(); + if (!meta.getCatalogProperties().containsKey(URI)) { + meta.putToCatalogProperties(URI, defaultRestURI()); + } + meta.putToCatalogProperties(CatalogProperties.CATALOG_IMPL, RESTCatalog.class.getName()); + return meta.deepCopy(); + } + + @Override + public void updateMetadata(CatalogMeta metadata) { + String defaultUrl = defaultRestURI(); + String uri = metadata.getCatalogProperties().getOrDefault(URI, defaultUrl); + if (defaultUrl.equals(uri)) { + metadata.getCatalogProperties().remove(URI); + } + super.updateMetadata(metadata); + } + + @Override + public AmoroTable loadTable(String database, String tableName) { + Preconditions.checkArgument( + !isChangeStoreName(tableName), "table name is invalid for load table"); + + InternalTableHandler handler; + try { + handler = newTableHandler(database, tableName); + } catch (ObjectNotExistsException e) { + return null; + } + if (TableFormat.ICEBERG.equals(handler.tableMetadata().getFormat())) { + return loadIcebergTable(database, tableName, handler); + } else if (TableFormat.MIXED_ICEBERG.equals(handler.tableMetadata().getFormat())) { + return loadMixedIcebergTable(database, tableName, handler); + } else { + throw new IllegalArgumentException( + "Unsupported table format:" + handler.tableMetadata().getFormat()); + } + } + + private AmoroTable loadIcebergTable( + String database, String tableName, InternalTableHandler handler) { + TableMetadata tableMetadata = handler.tableMetadata(); + TableOperations ops = handler.newTableOperator(); + + BaseTable table = + new BaseTable( + ops, + TableIdentifier.of( + tableMetadata.getTableIdentifier().getDatabase(), + tableMetadata.getTableIdentifier().getTableName()) + .toString()); + org.apache.amoro.table.TableIdentifier tableIdentifier = + org.apache.amoro.table.TableIdentifier.of(name(), database, tableName); + AmoroTable amoroTable = + IcebergTable.newIcebergTable( + tableIdentifier, + table, + CatalogUtil.buildMetaStore(getMetadata()), + getMetadata().getCatalogProperties()); + fileIOCloser.put(amoroTable, ops.io()); + return amoroTable; + } + + private AmoroTable loadMixedIcebergTable( + String database, String tableName, InternalTableHandler handler) { + TableMetadata tableMetadata = handler.tableMetadata(); + org.apache.amoro.table.TableIdentifier tableIdentifier = + org.apache.amoro.table.TableIdentifier.of(name(), database, tableName); + AuthenticatedFileIO fileIO = InternalTableUtil.newIcebergFileIo(getMetadata()); + MixedTable mixedIcebergTable; + + BaseTable baseTable = loadTableStore(tableMetadata, false); + if (InternalTableUtil.isKeyedMixedTable(tableMetadata)) { + BaseTable changeTable = loadTableStore(tableMetadata, true); + + PrimaryKeySpec.Builder keySpecBuilder = PrimaryKeySpec.builderFor(baseTable.schema()); + tableMetadata.buildTableMeta().getKeySpec().getFields().forEach(keySpecBuilder::addColumn); + PrimaryKeySpec keySpec = keySpecBuilder.build(); + + mixedIcebergTable = + new BasicKeyedTable( + tableMetadata.getTableLocation(), + keySpec, + new BasicKeyedTable.BaseInternalTable( + tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()), + new BasicKeyedTable.ChangeInternalTable( + tableIdentifier, changeTable, fileIO, getMetadata().getCatalogProperties())); + } else { + mixedIcebergTable = + new BasicUnkeyedTable( + tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()); + } + AmoroTable amoroTable = + new org.apache.amoro.formats.mixed.MixedTable(mixedIcebergTable, TableFormat.MIXED_ICEBERG); + fileIOCloser.put(amoroTable, fileIO); + return amoroTable; + } + + private BaseTable loadTableStore(TableMetadata tableMetadata, boolean isChangeStore) { + TableOperations ops = newTableStoreHandler(tableMetadata, isChangeStore).newTableOperator(); + return new BaseTable( + ops, + TableIdentifier.of( + tableMetadata.getTableIdentifier().getDatabase(), + tableMetadata.getTableIdentifier().getTableName()) + .toString()); + } + + private String defaultRestURI() { + return "http://" + exposedHost + ":" + httpPort + RestCatalogService.ICEBERG_REST_API_PREFIX; + } + + @Override + public InternalTableCreator newTableCreator( + String database, String tableName, TableFormat format, CreateTableRequest creatorArguments) { + if (tableExists(database, tableName)) { + throw new AlreadyExistsException( + "Table " + name() + "." + database + "." + tableName + " already exists."); + } + if (TableFormat.ICEBERG.equals(format)) { + return new InternalIcebergCreator(getMetadata(), database, tableName, creatorArguments); + } else if (TableFormat.MIXED_ICEBERG.equals(format)) { + return new InternalMixedIcebergCreator(getMetadata(), database, tableName, creatorArguments); + } else { + throw new IllegalArgumentException("Unsupported table format:" + format); + } + } + + @Override + @SuppressWarnings("unchecked") + public InternalTableHandler newTableHandler(String database, String tableName) { + String realTableName = realTableName(tableName); + TableMetadata metadata = loadTableMetadata(database, realTableName); + if (TableFormat.ICEBERG.equals(metadata.getFormat())) { + return new InternalIcebergHandler(getMetadata(), metadata); + } else if (TableFormat.MIXED_ICEBERG.equals(metadata.getFormat())) { + boolean isChangeStore = isChangeStoreName(tableName); + return newTableStoreHandler(metadata, isChangeStore); + } else { + throw new IllegalArgumentException("Unsupported table format:" + metadata.getFormat()); + } + } + + private String realTableName(String tableStoreName) { + if (isChangeStoreName(tableStoreName)) { + return tableStoreName.substring( + 0, tableStoreName.length() - CHANGE_STORE_TABLE_NAME_SUFFIX.length()); + } + return tableStoreName; + } + + private boolean isChangeStoreName(String tableName) { + String separator = InternalMixedIcebergCatalog.CHANGE_STORE_SEPARATOR; + if (!tableName.contains(separator)) { + return false; + } + Preconditions.checkArgument( + tableName.indexOf(separator) == tableName.lastIndexOf(separator) + && tableName.endsWith(CHANGE_STORE_TABLE_NAME_SUFFIX), + "illegal table name: %s, %s is not allowed in table name.", + tableName, + separator); + + return true; + } + + private InternalTableHandler newTableStoreHandler( + TableMetadata metadata, boolean isChangeStore) { + return new InternalMixedIcebergHandler(getMetadata(), metadata, isChangeStore); + } + + private Cache, FileIO> newFileIOCloser() { + return Caffeine.newBuilder() + .weakKeys() + .removalListener( + (RemovalListener, FileIO>) + (tbl, fileIO, cause) -> { + if (null != fileIO) { + fileIO.close(); + } + }) + .build(); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java deleted file mode 100644 index bfc4a70b83..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.catalog; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; -import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableFormat; -import org.apache.amoro.api.CatalogMeta; -import org.apache.amoro.config.Configurations; -import org.apache.amoro.formats.iceberg.IcebergTable; -import org.apache.amoro.io.AuthenticatedFileIO; -import org.apache.amoro.server.AmoroManagementConf; -import org.apache.amoro.server.RestCatalogService; -import org.apache.amoro.server.exception.ObjectNotExistsException; -import org.apache.amoro.server.table.TableMetadata; -import org.apache.amoro.server.table.internal.InternalIcebergCreator; -import org.apache.amoro.server.table.internal.InternalIcebergHandler; -import org.apache.amoro.server.table.internal.InternalTableCreator; -import org.apache.amoro.server.table.internal.InternalTableHandler; -import org.apache.amoro.server.utils.InternalTableUtil; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.amoro.utils.CatalogUtil; -import org.apache.iceberg.BaseTable; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.rest.RESTCatalog; -import org.apache.iceberg.rest.requests.CreateTableRequest; - -public class InternalIcebergCatalogImpl extends InternalCatalog { - - private static final String URI = "uri"; - - final int httpPort; - final String exposedHost; - - final Cache, FileIO> fileIOCloser; - - protected InternalIcebergCatalogImpl(CatalogMeta metadata, Configurations serverConfiguration) { - super(metadata); - this.httpPort = serverConfiguration.getInteger(AmoroManagementConf.HTTP_SERVER_PORT); - this.exposedHost = serverConfiguration.getString(AmoroManagementConf.SERVER_EXPOSE_HOST); - this.fileIOCloser = newFileIOCloser(); - } - - @Override - public CatalogMeta getMetadata() { - CatalogMeta meta = super.getMetadata(); - if (!meta.getCatalogProperties().containsKey(URI)) { - meta.putToCatalogProperties(URI, defaultRestURI()); - } - meta.putToCatalogProperties(CatalogProperties.CATALOG_IMPL, RESTCatalog.class.getName()); - return meta.deepCopy(); - } - - @Override - public void updateMetadata(CatalogMeta metadata) { - String defaultUrl = defaultRestURI(); - String uri = metadata.getCatalogProperties().getOrDefault(URI, defaultUrl); - if (defaultUrl.equals(uri)) { - metadata.getCatalogProperties().remove(URI); - } - super.updateMetadata(metadata); - } - - @Override - public AmoroTable loadTable(String database, String tableName) { - InternalTableHandler handler; - try { - handler = newTableHandler(database, tableName); - } catch (ObjectNotExistsException e) { - return null; - } - TableMetadata tableMetadata = handler.tableMetadata(); - TableOperations ops = handler.newTableOperator(); - - BaseTable table = - new BaseTable( - ops, - TableIdentifier.of( - tableMetadata.getTableIdentifier().getDatabase(), - tableMetadata.getTableIdentifier().getTableName()) - .toString()); - org.apache.amoro.table.TableIdentifier tableIdentifier = - org.apache.amoro.table.TableIdentifier.of(name(), database, tableName); - AmoroTable amoroTable = - IcebergTable.newIcebergTable( - tableIdentifier, - table, - CatalogUtil.buildMetaStore(getMetadata()), - getMetadata().getCatalogProperties()); - fileIOCloser.put(amoroTable, ops.io()); - return amoroTable; - } - - protected AuthenticatedFileIO fileIO(CatalogMeta catalogMeta) { - return InternalTableUtil.newIcebergFileIo(catalogMeta); - } - - private String defaultRestURI() { - return "http://" + exposedHost + ":" + httpPort + RestCatalogService.ICEBERG_REST_API_PREFIX; - } - - @Override - public InternalTableCreator newTableCreator( - String database, String tableName, TableFormat format, CreateTableRequest creatorArguments) { - - Preconditions.checkArgument( - format == format(), "the catalog only support to create %s table", format().name()); - if (tableExists(database, tableName)) { - throw new AlreadyExistsException( - "Table " + name() + "." + database + "." + tableName + " already " + "exists."); - } - return newTableCreator(database, tableName, creatorArguments); - } - - protected TableFormat format() { - return TableFormat.ICEBERG; - } - - protected InternalTableCreator newTableCreator( - String database, String tableName, CreateTableRequest request) { - return new InternalIcebergCreator(getMetadata(), database, tableName, request); - } - - @Override - public InternalTableHandler newTableHandler(String database, String tableName) { - TableMetadata metadata = loadTableMetadata(database, tableName); - Preconditions.checkState( - metadata.getFormat() == format(), - "the catalog only support to handle %s table", - format().name()); - //noinspection unchecked - return (InternalTableHandler) new InternalIcebergHandler(getMetadata(), metadata); - } - - private Cache, FileIO> newFileIOCloser() { - return Caffeine.newBuilder() - .weakKeys() - .removalListener( - (RemovalListener, FileIO>) - (tbl, fileIO, cause) -> { - if (null != fileIO) { - fileIO.close(); - } - }) - .build(); - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java deleted file mode 100644 index 45d2cfb339..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.catalog; - -import static org.apache.amoro.server.table.internal.InternalTableConstants.CHANGE_STORE_TABLE_NAME_SUFFIX; - -import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableFormat; -import org.apache.amoro.api.CatalogMeta; -import org.apache.amoro.config.Configurations; -import org.apache.amoro.io.AuthenticatedFileIO; -import org.apache.amoro.mixed.InternalMixedIcebergCatalog; -import org.apache.amoro.server.persistence.mapper.TableMetaMapper; -import org.apache.amoro.server.table.TableMetadata; -import org.apache.amoro.server.table.internal.InternalMixedIcebergCreator; -import org.apache.amoro.server.table.internal.InternalMixedIcebergHandler; -import org.apache.amoro.server.table.internal.InternalTableCreator; -import org.apache.amoro.server.table.internal.InternalTableHandler; -import org.apache.amoro.server.utils.InternalTableUtil; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.amoro.table.BasicKeyedTable; -import org.apache.amoro.table.BasicUnkeyedTable; -import org.apache.amoro.table.MixedTable; -import org.apache.amoro.table.PrimaryKeySpec; -import org.apache.iceberg.BaseTable; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.rest.requests.CreateTableRequest; - -public class InternalMixedCatalogImpl extends InternalIcebergCatalogImpl { - - protected InternalMixedCatalogImpl(CatalogMeta metadata, Configurations serverConfiguration) { - super(metadata, serverConfiguration); - } - - @Override - protected InternalTableCreator newTableCreator( - String database, String tableName, CreateTableRequest request) { - return new InternalMixedIcebergCreator(getMetadata(), database, tableName, request); - } - - @Override - public InternalTableHandler newTableHandler(String database, String tableStoreName) { - String tableName = tableName(tableStoreName); - boolean isChangeStore = isChangeStoreName(tableStoreName); - TableMetadata metadata = loadTableMetadata(database, tableName); - Preconditions.checkState( - metadata.getFormat() == format(), - "the catalog only support to handle %s table", - format().name()); - //noinspection unchecked - return (InternalTableHandler) newTableStoreHandler(metadata, isChangeStore); - } - - private InternalTableHandler newTableStoreHandler( - TableMetadata metadata, boolean isChangeStore) { - return new InternalMixedIcebergHandler(getMetadata(), metadata, isChangeStore); - } - - private String tableName(String tableStoreName) { - if (isChangeStoreName(tableStoreName)) { - return tableStoreName.substring( - 0, tableStoreName.length() - CHANGE_STORE_TABLE_NAME_SUFFIX.length()); - } - return tableStoreName; - } - - private boolean isChangeStoreName(String tableName) { - String separator = InternalMixedIcebergCatalog.CHANGE_STORE_SEPARATOR; - if (!tableName.contains(separator)) { - return false; - } - Preconditions.checkArgument( - tableName.indexOf(separator) == tableName.lastIndexOf(separator) - && tableName.endsWith(CHANGE_STORE_TABLE_NAME_SUFFIX), - "illegal table name: %s, %s is not allowed in table name.", - tableName, - separator); - - return true; - } - - @Override - public AmoroTable loadTable(String database, String tableName) { - Preconditions.checkArgument( - !isChangeStoreName(tableName), "table name is invalid for load table"); - TableMetadata tableMetadata = - getAs( - TableMetaMapper.class, - mapper -> - mapper.selectTableMetaByName(getMetadata().getCatalogName(), database, tableName)); - if (tableMetadata == null) { - return null; - } - Preconditions.checkArgument( - TableFormat.MIXED_ICEBERG == tableMetadata.getFormat(), - "Table: %s.%s.%s is not a mixed-iceberg table", - name(), - database, - tableName); - - org.apache.amoro.table.TableIdentifier tableIdentifier = - org.apache.amoro.table.TableIdentifier.of(name(), database, tableName); - AuthenticatedFileIO fileIO = InternalTableUtil.newIcebergFileIo(getMetadata()); - MixedTable mixedIcebergTable; - - BaseTable baseTable = loadTableStore(tableMetadata, false); - if (InternalTableUtil.isKeyedMixedTable(tableMetadata)) { - BaseTable changeTable = loadTableStore(tableMetadata, true); - - PrimaryKeySpec.Builder keySpecBuilder = PrimaryKeySpec.builderFor(baseTable.schema()); - tableMetadata.buildTableMeta().getKeySpec().getFields().forEach(keySpecBuilder::addColumn); - PrimaryKeySpec keySpec = keySpecBuilder.build(); - - mixedIcebergTable = - new BasicKeyedTable( - tableMetadata.getTableLocation(), - keySpec, - new BasicKeyedTable.BaseInternalTable( - tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()), - new BasicKeyedTable.ChangeInternalTable( - tableIdentifier, changeTable, fileIO, getMetadata().getCatalogProperties())); - } else { - - mixedIcebergTable = - new BasicUnkeyedTable( - tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()); - } - AmoroTable amoroTable = - new org.apache.amoro.formats.mixed.MixedTable(mixedIcebergTable, TableFormat.MIXED_ICEBERG); - fileIOCloser.put(amoroTable, fileIO); - return amoroTable; - } - - protected TableFormat format() { - return TableFormat.MIXED_ICEBERG; - } - - protected BaseTable loadTableStore(TableMetadata tableMetadata, boolean isChangeStore) { - TableOperations ops = newTableStoreHandler(tableMetadata, isChangeStore).newTableOperator(); - return new BaseTable( - ops, - TableIdentifier.of( - tableMetadata.getTableIdentifier().getDatabase(), - tableMetadata.getTableIdentifier().getTableName()) - .toString()); - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/MixedHiveCatalogImpl.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/MixedHiveCatalogImpl.java deleted file mode 100644 index 0d1cc9b1cb..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/MixedHiveCatalogImpl.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.catalog; - -import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableFormat; -import org.apache.amoro.api.CatalogMeta; -import org.apache.amoro.formats.mixed.MixedTable; -import org.apache.amoro.hive.CachedHiveClientPool; -import org.apache.amoro.hive.HMSClient; -import org.apache.amoro.hive.catalog.MixedHiveTables; -import org.apache.amoro.server.persistence.mapper.TableMetaMapper; -import org.apache.amoro.server.table.TableMetadata; -import org.apache.amoro.server.table.internal.InternalTableCreator; -import org.apache.amoro.server.table.internal.InternalTableHandler; -import org.apache.amoro.utils.CatalogUtil; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.iceberg.rest.requests.CreateTableRequest; -import org.apache.thrift.TException; - -import java.util.List; - -public class MixedHiveCatalogImpl extends InternalCatalog { - protected MixedHiveTables tables; - private volatile CachedHiveClientPool hiveClientPool; - - protected MixedHiveCatalogImpl(CatalogMeta catalogMeta) { - super(catalogMeta); - this.tables = - new MixedHiveTables( - catalogMeta.getCatalogProperties(), CatalogUtil.buildMetaStore(catalogMeta)); - hiveClientPool = ((MixedHiveTables) tables()).getHiveClientPool(); - } - - @Override - public void updateMetadata(CatalogMeta metadata) { - super.updateMetadata(metadata); - hiveClientPool = ((MixedHiveTables) tables()).getHiveClientPool(); - this.tables = - new MixedHiveTables(metadata.getCatalogProperties(), CatalogUtil.buildMetaStore(metadata)); - } - - @Override - public AmoroTable loadTable(String database, String tableName) { - TableMetadata tableMetadata = - getAs( - TableMetaMapper.class, - mapper -> - mapper.selectTableMetaByName(getMetadata().getCatalogName(), database, tableName)); - if (tableMetadata == null) { - return null; - } - return new MixedTable( - tables.loadTableByMeta(tableMetadata.buildTableMeta()), TableFormat.MIXED_HIVE); - } - - @Override - public void createDatabase(String databaseName) { - // do not handle database operations - } - - @Override - public void dropDatabase(String databaseName) { - // do not handle database operations - } - - @Override - public InternalTableCreator newTableCreator( - String database, String tableName, TableFormat format, CreateTableRequest creatorArguments) { - throw new UnsupportedOperationException(); - } - - @Override - public InternalTableHandler newTableHandler(String database, String tableName) { - throw new UnsupportedOperationException(); - } - - @Override - protected void decreaseDatabaseTableCount(String databaseName) { - // do not handle database operations - } - - @Override - protected void increaseDatabaseTableCount(String databaseName) { - // do not handle database operations - } - - @Override - public boolean databaseExists(String database) { - try { - return hiveClientPool.run( - client -> { - try { - client.getDatabase(database); - return true; - } catch (NoSuchObjectException exception) { - return false; - } - }); - } catch (TException | InterruptedException e) { - throw new RuntimeException("Failed to get databases", e); - } - } - - @Override - public List listDatabases() { - try { - return hiveClientPool.run(HMSClient::getAllDatabases); - } catch (TException | InterruptedException e) { - throw new RuntimeException("Failed to list databases", e); - } - } - - public CachedHiveClientPool getHiveClient() { - return hiveClientPool; - } - - private MixedHiveTables tables() { - return tables; - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java index 7ed15669cb..64dba7a82e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java @@ -252,10 +252,11 @@ public void dispose() { private String catalogConnectorType(CatalogMeta catalogMeta) { String catalogType = catalogMeta.getCatalogType(); Set tableFormatSet = CatalogUtil.tableFormats(catalogMeta); - if (catalogType.equalsIgnoreCase(CatalogType.AMS.name())) { - if (tableFormatSet.contains(TableFormat.MIXED_ICEBERG)) { - return "arctic"; + if (tableFormatSet.size() > 1) { + return "unified"; + } else if (tableFormatSet.contains(TableFormat.MIXED_ICEBERG)) { + return "mixed_iceberg"; } else if (tableFormatSet.contains(TableFormat.ICEBERG)) { return "iceberg"; } @@ -263,9 +264,10 @@ private String catalogConnectorType(CatalogMeta catalogMeta) { || catalogType.equalsIgnoreCase(CatalogType.HADOOP.name())) { if (tableFormatSet.size() > 1) { return "unified"; - } else if (tableFormatSet.contains(TableFormat.MIXED_HIVE) - || tableFormatSet.contains(TableFormat.MIXED_ICEBERG)) { - return "arctic"; + } else if (tableFormatSet.contains(TableFormat.MIXED_ICEBERG)) { + return "mixed_iceberg"; + } else if (tableFormatSet.contains(TableFormat.MIXED_HIVE)) { + return "mixed_hive"; } else if (tableFormatSet.contains(TableFormat.ICEBERG)) { return "iceberg"; } else if (tableFormatSet.contains(TableFormat.PAIMON)) { diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/mixed/MixedIcebergCatalogFactory.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/mixed/MixedIcebergCatalogFactory.java index b8b0d4ffb4..90fea3e7e7 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/mixed/MixedIcebergCatalogFactory.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/mixed/MixedIcebergCatalogFactory.java @@ -18,14 +18,16 @@ package org.apache.amoro.formats.mixed; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; + import org.apache.amoro.FormatCatalog; import org.apache.amoro.FormatCatalogFactory; import org.apache.amoro.TableFormat; import org.apache.amoro.mixed.CatalogLoader; import org.apache.amoro.mixed.MixedFormatCatalog; import org.apache.amoro.properties.CatalogMetaProperties; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.table.TableMetaStore; -import org.apache.amoro.utils.MixedFormatCatalogUtil; import java.util.Map; @@ -49,9 +51,8 @@ public TableFormat format() { @Override public Map convertCatalogProperties( String catalogName, String metastoreType, Map unifiedCatalogProperties) { - Map properties = - MixedFormatCatalogUtil.withIcebergCatalogInitializeProperties( - catalogName, metastoreType, unifiedCatalogProperties); + Map properties = Maps.newHashMap(unifiedCatalogProperties); + properties.put(ICEBERG_CATALOG_TYPE, metastoreType); properties.put(CatalogMetaProperties.TABLE_FORMATS, format().name()); return properties; } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/BasicMixedIcebergCatalog.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/BasicMixedIcebergCatalog.java index 033eb2e163..3b90493a69 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/BasicMixedIcebergCatalog.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/BasicMixedIcebergCatalog.java @@ -18,6 +18,8 @@ package org.apache.amoro.mixed; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; + import org.apache.amoro.AmsClient; import org.apache.amoro.PooledAmsClient; import org.apache.amoro.io.AuthenticatedFileIO; @@ -76,11 +78,16 @@ public void initialize(String name, Map properties, TableMetaSto String databaseFilter = properties.get(CatalogMetaProperties.KEY_DATABASE_FILTER); databaseFilterPattern = Pattern.compile(databaseFilter); } + String metastoreType = properties.get(ICEBERG_CATALOG_TYPE); + Map icebergCatalogProperties = + MixedFormatCatalogUtil.withIcebergCatalogInitializeProperties( + name, metastoreType, properties); org.apache.iceberg.catalog.Catalog catalog = - buildIcebergCatalog(name, properties, metaStore.getConfiguration()); + buildIcebergCatalog(name, icebergCatalogProperties, metaStore.getConfiguration()); this.name = name; this.tableMetaStore = metaStore; - this.icebergCatalog = MixedFormatCatalogUtil.buildCacheCatalog(catalog, properties); + this.icebergCatalog = + MixedFormatCatalogUtil.buildCacheCatalog(catalog, icebergCatalogProperties); if (catalog instanceof SupportsNamespaces) { this.asNamespaceCatalog = (SupportsNamespaces) catalog; } diff --git a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/catalog/MixedHiveCatalog.java b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/catalog/MixedHiveCatalog.java index 5b04e54b64..6e43e0fb81 100644 --- a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/catalog/MixedHiveCatalog.java +++ b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/catalog/MixedHiveCatalog.java @@ -112,7 +112,7 @@ public void initialize(String name, Map properties, TableMetaSto this.catalogProperties = properties; this.tableMetaStore = metaStore; this.tables = newMixedHiveTables(properties, metaStore); - this.hiveClientPool = ((MixedHiveTables) tables).getHiveClientPool(); + this.hiveClientPool = tables.getHiveClientPool(); } protected MixedHiveTables getTables() { diff --git a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java index 952375870e..8276a6a75d 100644 --- a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java +++ b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java @@ -20,9 +20,11 @@ import static org.apache.amoro.spark.mixed.SparkSQLProperties.REFRESH_CATALOG_BEFORE_USAGE; import static org.apache.amoro.spark.mixed.SparkSQLProperties.REFRESH_CATALOG_BEFORE_USAGE_DEFAULT; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; import org.apache.amoro.mixed.CatalogLoader; import org.apache.amoro.mixed.MixedFormatCatalog; +import org.apache.amoro.properties.CatalogMetaProperties; import org.apache.amoro.shade.guava32.com.google.common.base.Joiner; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -63,10 +65,10 @@ public void setAuthenticationContext(TableMetaStore tableMetaStore) { @Override public final void initialize(String name, CaseInsensitiveStringMap options) { this.catalogName = name; - String catalogUrl = options.get("ams.uri"); + String catalogUrl = options.get(CatalogMetaProperties.AMS_URI); if (StringUtils.isNotBlank(catalogUrl)) { // initialize for unified catalog. - String metastoreType = options.get("type"); + String metastoreType = options.get(ICEBERG_CATALOG_TYPE); String registerName = options.get("register-name"); Preconditions.checkArgument( StringUtils.isNotEmpty(metastoreType), From 17d9d5ee67d27dc98455d941c9e3bacad2ddb2d0 Mon Sep 17 00:00:00 2001 From: Xavier Bai Date: Mon, 9 Sep 2024 15:53:48 +0800 Subject: [PATCH 2/6] [AMORO-3163] Support for configuring the number of remaining snapshots (#3164) * configure number of snapshots * update document --- .../maintainer/IcebergTableMaintainer.java | 18 ++++-- .../maintainer/MixedTableMaintainer.java | 14 ++-- .../server/table/TableConfigurations.java | 16 ++++- .../maintainer/TestSnapshotExpire.java | 64 +++++++++++++++++-- .../maintainer/TestSnapshotExpireHive.java | 2 +- .../apache/amoro/config/ConfigHelpers.java | 8 ++- .../amoro/config/TableConfiguration.java | 12 ++++ .../apache/amoro/table/TableProperties.java | 18 +++++- .../amoro/utils/CompatiblePropertyUtil.java | 2 + docs/user-guides/configurations.md | 3 +- 10 files changed, 132 insertions(+), 25 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java index cee5499af1..86c846e116 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -167,7 +167,9 @@ public void expireSnapshots(TableRuntime tableRuntime) { if (!expireSnapshotEnabled(tableRuntime)) { return; } - expireSnapshots(mustOlderThan(tableRuntime, System.currentTimeMillis())); + expireSnapshots( + mustOlderThan(tableRuntime, System.currentTimeMillis()), + tableRuntime.getTableConfiguration().getSnapshotMinCount()); } protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) { @@ -176,18 +178,22 @@ protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) { } @VisibleForTesting - void expireSnapshots(long mustOlderThan) { - expireSnapshots(mustOlderThan, expireSnapshotNeedToExcludeFiles()); + void expireSnapshots(long mustOlderThan, int minCount) { + expireSnapshots(mustOlderThan, minCount, expireSnapshotNeedToExcludeFiles()); } - private void expireSnapshots(long olderThan, Set exclude) { - LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude); + private void expireSnapshots(long olderThan, int minCount, Set exclude) { + LOG.debug( + "start expire snapshots older than {} and retain last {} snapshots, the exclude is {}", + olderThan, + minCount, + exclude); final AtomicInteger toDeleteFiles = new AtomicInteger(0); Set parentDirectories = new HashSet<>(); Set expiredFiles = new HashSet<>(); table .expireSnapshots() - .retainLast(1) + .retainLast(Math.max(minCount, 1)) .expireOlderThan(olderThan) .deleteWith( file -> { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java index a4a6445fd9..1b5c8cbd82 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java @@ -109,11 +109,11 @@ public void expireSnapshots(TableRuntime tableRuntime) { } @VisibleForTesting - protected void expireSnapshots(long mustOlderThan) { + protected void expireSnapshots(long mustOlderThan, int minCount) { if (changeMaintainer != null) { - changeMaintainer.expireSnapshots(mustOlderThan); + changeMaintainer.expireSnapshots(mustOlderThan, minCount); } - baseMaintainer.expireSnapshots(mustOlderThan); + baseMaintainer.expireSnapshots(mustOlderThan, minCount); } @Override @@ -291,9 +291,9 @@ public ChangeTableMaintainer(UnkeyedTable unkeyedTable) { @Override @VisibleForTesting - void expireSnapshots(long mustOlderThan) { + void expireSnapshots(long mustOlderThan, int minCount) { expireFiles(mustOlderThan); - super.expireSnapshots(mustOlderThan); + super.expireSnapshots(mustOlderThan, minCount); } @Override @@ -303,7 +303,9 @@ public void expireSnapshots(TableRuntime tableRuntime) { } long now = System.currentTimeMillis(); expireFiles(now - snapshotsKeepTime(tableRuntime)); - expireSnapshots(mustOlderThan(tableRuntime, now)); + expireSnapshots( + mustOlderThan(tableRuntime, now), + tableRuntime.getTableConfiguration().getSnapshotMinCount()); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java index bdb258c0b2..bb7823c99e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.temporal.ChronoUnit; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -61,10 +62,19 @@ public static TableConfiguration parseTableConfig(Map properties TableProperties.ENABLE_TABLE_EXPIRE, TableProperties.ENABLE_TABLE_EXPIRE_DEFAULT)) .setSnapshotTTLMinutes( - CompatiblePropertyUtil.propertyAsLong( + ConfigHelpers.TimeUtils.parseDuration( + CompatiblePropertyUtil.propertyAsString( + properties, + TableProperties.SNAPSHOT_KEEP_DURATION, + TableProperties.SNAPSHOT_KEEP_DURATION_DEFAULT), + ChronoUnit.MINUTES) + .getSeconds() + / 60) + .setSnapshotMinCount( + CompatiblePropertyUtil.propertyAsInt( properties, - TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, - TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) + TableProperties.SNAPSHOT_MIN_COUNT, + TableProperties.SNAPSHOT_MIN_COUNT_DEFAULT)) .setChangeDataTTLMinutes( CompatiblePropertyUtil.propertyAsLong( properties, diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java index 559accaa64..2796b21522 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java @@ -122,7 +122,7 @@ public void testExpireChangeTableFiles() { // In order to advance the snapshot insertChangeDataFiles(testKeyedTable, 2); - tableMaintainer.getChangeMaintainer().expireSnapshots(System.currentTimeMillis()); + tableMaintainer.getChangeMaintainer().expireSnapshots(System.currentTimeMillis(), 1); Assert.assertEquals(1, Iterables.size(testKeyedTable.changeTable().snapshots())); s1Files.forEach( @@ -333,7 +333,7 @@ public void testExpireTableFiles4All() { List newDataFiles = writeAndCommitBaseStore(table); Assert.assertEquals(3, Iterables.size(table.snapshots())); - new MixedTableMaintainer(table).expireSnapshots(System.currentTimeMillis()); + new MixedTableMaintainer(table).expireSnapshots(System.currentTimeMillis(), 1); Assert.assertEquals(1, Iterables.size(table.snapshots())); dataFiles.forEach(file -> Assert.assertFalse(table.io().exists(file.path().toString()))); @@ -383,7 +383,7 @@ public void testExpireTableFilesRepeatedly() { MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(testKeyedTable); tableMaintainer.getChangeMaintainer().expireFiles(secondCommitTime + 1); - tableMaintainer.getChangeMaintainer().expireSnapshots(secondCommitTime + 1); + tableMaintainer.getChangeMaintainer().expireSnapshots(secondCommitTime + 1, 1); Set dataFiles = getDataFiles(testKeyedTable); Assert.assertEquals(last4File, dataFiles); @@ -449,7 +449,7 @@ public void testExpireStatisticsFiles() { Assert.assertTrue(baseTable.io().exists(file1.path())); Assert.assertTrue(baseTable.io().exists(file2.path())); Assert.assertTrue(baseTable.io().exists(file3.path())); - new MixedTableMaintainer(testKeyedTable).expireSnapshots(expireTime); + new MixedTableMaintainer(testKeyedTable).expireSnapshots(expireTime, 1); Assert.assertEquals(1, Iterables.size(baseTable.snapshots())); Assert.assertFalse(baseTable.io().exists(file1.path())); @@ -525,6 +525,62 @@ public void testBaseTableGcDisabled() { Assert.assertEquals(1, Iterables.size(testUnkeyedTable.snapshots())); } + @Test + public void testRetainMinSnapshot() { + UnkeyedTable table = + isKeyedTable() + ? getMixedTable().asKeyedTable().baseTable() + : getMixedTable().asUnkeyedTable(); + table.newAppend().commit(); + table.newAppend().commit(); + List expectedSnapshots = new ArrayList<>(); + expectedSnapshots.add(table.currentSnapshot()); + + table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "0s").commit(); + table.updateProperties().set(TableProperties.SNAPSHOT_MIN_COUNT, "3").commit(); + + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Mockito.when(tableRuntime.getTableIdentifier()) + .thenReturn( + ServerTableIdentifier.of(AmsUtil.toTableIdentifier(table.id()), getTestFormat())); + Mockito.when(tableRuntime.getTableConfiguration()) + .thenReturn(TableConfigurations.parseTableConfig(table.properties())); + Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE); + + new MixedTableMaintainer(table).expireSnapshots(tableRuntime); + Assert.assertEquals(2, Iterables.size(table.snapshots())); + + table.newAppend().commit(); + expectedSnapshots.add(table.currentSnapshot()); + table.newAppend().commit(); + expectedSnapshots.add(table.currentSnapshot()); + + new MixedTableMaintainer(table).expireSnapshots(tableRuntime); + Assert.assertEquals(3, Iterables.size(table.snapshots())); + Assert.assertTrue( + Iterators.elementsEqual(expectedSnapshots.iterator(), table.snapshots().iterator())); + } + + @Test + public void testSnapshotExpireConfig() { + UnkeyedTable table = + isKeyedTable() + ? getMixedTable().asKeyedTable().baseTable() + : getMixedTable().asUnkeyedTable(); + table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "180s").commit(); + Assert.assertEquals( + 3L, TableConfigurations.parseTableConfig(table.properties()).getSnapshotTTLMinutes()); + + // using default time unit: minutes + table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "720").commit(); + Assert.assertEquals( + 720L, TableConfigurations.parseTableConfig(table.properties()).getSnapshotTTLMinutes()); + + table.updateProperties().set(TableProperties.SNAPSHOT_MIN_COUNT, "10").commit(); + Assert.assertEquals( + 10, TableConfigurations.parseTableConfig(table.properties()).getSnapshotMinCount()); + } + private long waitUntilAfter(long timestampMillis) { long current = System.currentTimeMillis(); while (current <= timestampMillis) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java index efad6c8834..dd9f198c86 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java @@ -115,7 +115,7 @@ public void testExpireTableFiles() { ? getMixedTable().asKeyedTable().baseTable() : getMixedTable().asUnkeyedTable(); MixedTableMaintainer mixedTableMaintainer = new MixedTableMaintainer(getMixedTable()); - mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis()); + mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis(), 1); Assert.assertEquals(1, Iterables.size(unkeyedTable.snapshots())); hiveFiles.forEach( diff --git a/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java b/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java index 4873e8581c..d7a705e04a 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java @@ -299,6 +299,10 @@ public static class TimeUtils { private static final Map LABEL_TO_UNIT_MAP = Collections.unmodifiableMap(initMap()); + public static Duration parseDuration(String text) { + return parseDuration(text, ChronoUnit.MILLIS); + } + /** * Parse the given string to a java {@link Duration}. The string is in format "{length * value}{time unit label}", e.g. "123ms", "321 s". If no time unit label is specified, it will @@ -318,7 +322,7 @@ public static class TimeUtils { * * @param text string to parse. */ - public static Duration parseDuration(String text) { + public static Duration parseDuration(String text, ChronoUnit defaultUnit) { checkNotNull(text); final String trimmed = text.trim(); @@ -350,7 +354,7 @@ public static Duration parseDuration(String text) { } if (unitLabel.isEmpty()) { - return Duration.of(value, ChronoUnit.MILLIS); + return Duration.of(value, defaultUnit); } ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel); diff --git a/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java b/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java index f20705b1f2..5efedfbbea 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java @@ -27,6 +27,7 @@ public class TableConfiguration { private boolean expireSnapshotEnabled; private long snapshotTTLMinutes; + private int snapshotMinCount; private long changeDataTTLMinutes; private boolean cleanOrphanEnabled; private long orphanExistingMinutes; @@ -45,6 +46,10 @@ public long getSnapshotTTLMinutes() { return snapshotTTLMinutes; } + public int getSnapshotMinCount() { + return snapshotMinCount; + } + public long getChangeDataTTLMinutes() { return changeDataTTLMinutes; } @@ -76,6 +81,11 @@ public TableConfiguration setSnapshotTTLMinutes(long snapshotTTLMinutes) { return this; } + public TableConfiguration setSnapshotMinCount(int snapshotMinCount) { + this.snapshotMinCount = snapshotMinCount; + return this; + } + public TableConfiguration setChangeDataTTLMinutes(long changeDataTTLMinutes) { this.changeDataTTLMinutes = changeDataTTLMinutes; return this; @@ -130,6 +140,7 @@ public boolean equals(Object o) { TableConfiguration that = (TableConfiguration) o; return expireSnapshotEnabled == that.expireSnapshotEnabled && snapshotTTLMinutes == that.snapshotTTLMinutes + && snapshotMinCount == that.snapshotMinCount && changeDataTTLMinutes == that.changeDataTTLMinutes && cleanOrphanEnabled == that.cleanOrphanEnabled && orphanExistingMinutes == that.orphanExistingMinutes @@ -144,6 +155,7 @@ public int hashCode() { return Objects.hashCode( expireSnapshotEnabled, snapshotTTLMinutes, + snapshotMinCount, changeDataTTLMinutes, cleanOrphanEnabled, orphanExistingMinutes, diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java index efd9e994dd..debe3f0cae 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java @@ -150,8 +150,22 @@ private TableProperties() {} public static final String CHANGE_DATA_TTL = "change.data.ttl.minutes"; public static final long CHANGE_DATA_TTL_DEFAULT = 10080; // 7 Days - public static final String BASE_SNAPSHOT_KEEP_MINUTES = "snapshot.base.keep.minutes"; - public static final long BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT = 720; // 12 Hours + /** + * @deprecated Use {@link TableProperties#SNAPSHOT_KEEP_DURATION } instead; will be removed in + * 0.9.0 + */ + @Deprecated public static final String BASE_SNAPSHOT_KEEP_MINUTES = "snapshot.base.keep.minutes"; + /** + * @deprecated Use {@link TableProperties#SNAPSHOT_KEEP_DURATION_DEFAULT } instead; will be + * removed in 0.9.0 + */ + @Deprecated public static final long BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT = 720; // 12 Hours + + public static final String SNAPSHOT_KEEP_DURATION = "snapshot.keep.duration"; + public static final String SNAPSHOT_KEEP_DURATION_DEFAULT = "720min"; // 12 Hours + + public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count"; + public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1; public static final String ENABLE_ORPHAN_CLEAN = "clean-orphan-file.enabled"; public static final boolean ENABLE_ORPHAN_CLEAN_DEFAULT = false; diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/CompatiblePropertyUtil.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/CompatiblePropertyUtil.java index ab48600c58..6d68cfe7a6 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/CompatiblePropertyUtil.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/CompatiblePropertyUtil.java @@ -96,6 +96,8 @@ private static String getLegacyProperty(String property) { return TableProperties.ENABLE_ORPHAN_CLEAN_LEGACY; case TableProperties.ENABLE_LOG_STORE: return TableProperties.ENABLE_LOG_STORE_LEGACY; + case TableProperties.SNAPSHOT_KEEP_DURATION: + return TableProperties.BASE_SNAPSHOT_KEEP_MINUTES; default: return null; } diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md index aa1bda9877..5bcf7afbbd 100644 --- a/docs/user-guides/configurations.md +++ b/docs/user-guides/configurations.md @@ -69,7 +69,8 @@ Data-cleaning configurations are applicable to both Iceberg Format and Mixed str |---------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | table-expire.enabled | true | Enables periodically expire table | | change.data.ttl.minutes | 10080(7 days) | Time to live in minutes for data of ChangeStore | -| snapshot.base.keep.minutes | 720(12 hours) | Table-Expiration keeps the latest snapshots of BaseStore within a specified time in minutes | +| snapshot.keep.duration | 720min(12 hours) | Table-Expiration keeps the latest snapshots within a specified duration | +| snapshot.keep.min-count | 1 | Minimum number of snapshots retained for table expiration | | clean-orphan-file.enabled | false | Enables periodically clean orphan files | | clean-orphan-file.min-existing-time-minutes | 2880(2 days) | Cleaning orphan files keeps the files modified within a specified time in minutes | | clean-dangling-delete-files.enabled | true | Whether to enable cleaning of dangling delete files | From bc2e33e99fa643a86f2578b1b7c29587e9135571 Mon Sep 17 00:00:00 2001 From: Marig_Weizhi Date: Mon, 9 Sep 2024 16:06:48 +0800 Subject: [PATCH 3/6] [AMORO-3180] Implement health score calculation for tables (#3181) implement health score calculation for tables --- .../dashboard/controller/TableController.java | 5 ++ .../plan/AbstractPartitionPlan.java | 5 ++ .../plan/CommonPartitionEvaluator.java | 53 +++++++++++++++++++ .../optimizing/plan/OptimizingEvaluator.java | 9 ++++ .../optimizing/plan/PartitionEvaluator.java | 3 ++ .../amoro/server/table/TableRuntime.java | 4 ++ .../server/table/TableSummaryMetrics.java | 17 +++++- .../server/table/TestTableSummaryMetrics.java | 5 ++ .../amoro/table/descriptor/TableSummary.java | 10 ++++ amoro-web/mock/modules/table.js | 5 +- amoro-web/src/language/en.ts | 1 + amoro-web/src/language/zh.ts | 1 + amoro-web/src/types/common.type.ts | 2 + amoro-web/src/views/tables/index.vue | 13 +++++ docs/user-guides/metrics.md | 31 +++++------ 15 files changed, 146 insertions(+), 18 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java index b83be317b4..43a71e785d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java @@ -48,6 +48,7 @@ import org.apache.amoro.server.dashboard.utils.AmsUtil; import org.apache.amoro.server.dashboard.utils.CommonUtil; import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Function; @@ -149,6 +150,10 @@ public void getTableDetail(Context ctx) { if (serverTableIdentifier.isPresent()) { TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get()); tableSummary.setOptimizingStatus(tableRuntime.getOptimizingStatus().name()); + OptimizingEvaluator.PendingInput tableRuntimeSummary = tableRuntime.getTableSummary(); + if (tableRuntimeSummary != null) { + tableSummary.setHealthScore(tableRuntimeSummary.getHealthScore()); + } } else { tableSummary.setOptimizingStatus(OptimizingStatus.IDLE.name()); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java index 5a87cd9e27..4551decd77 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java @@ -181,6 +181,11 @@ protected interface TaskSplitter { List splitTasks(int targetTaskCount); } + @Override + public int getHealthScore() { + return evaluator.getHealthScore(); + } + @Override public int getFragmentFileCount() { return evaluator().getFragmentFileCount(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java index ffdb2660ce..013f11f87a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java @@ -369,6 +369,59 @@ public boolean anyDeleteExist() { return equalityDeleteFileCount > 0 || posDeleteFileCount > 0; } + @Override + public int getHealthScore() { + long dataFilesSize = getFragmentFileSize() + getSegmentFileSize(); + long dataFiles = getFragmentFileCount() + getSegmentFileCount(); + long dataRecords = getFragmentFileRecords() + getSegmentFileRecords(); + + double averageDataFileSize = getNormalizedRatio(dataFilesSize, dataFiles); + double eqDeleteRatio = getNormalizedRatio(equalityDeleteFileRecords, dataRecords); + double posDeleteRatio = getNormalizedRatio(posDeleteFileRecords, dataRecords); + + double tablePenaltyFactor = getTablePenaltyFactor(dataFiles, dataFilesSize); + return (int) + Math.ceil( + 100 + - tablePenaltyFactor + * (40 * getSmallFilePenaltyFactor(averageDataFileSize) + + 40 * getEqDeletePenaltyFactor(eqDeleteRatio) + + 20 * getPosDeletePenaltyFactor(posDeleteRatio))); + } + + private double getEqDeletePenaltyFactor(double eqDeleteRatio) { + double eqDeleteRatioThreshold = config.getMajorDuplicateRatio(); + return getNormalizedRatio(eqDeleteRatio, eqDeleteRatioThreshold); + } + + private double getPosDeletePenaltyFactor(double posDeleteRatio) { + double posDeleteRatioThreshold = config.getMajorDuplicateRatio() * 2; + return getNormalizedRatio(posDeleteRatio, posDeleteRatioThreshold); + } + + private double getSmallFilePenaltyFactor(double averageDataFileSize) { + return 1 - getNormalizedRatio(averageDataFileSize, minTargetSize); + } + + private double getTablePenaltyFactor(long dataFiles, long dataFilesSize) { + // if the number of table files is less than or equal to 1, + // there is no penalty, i.e., the table is considered to be perfectly healthy + if (dataFiles <= 1) { + return 0; + } + // The small table has very little impact on performance, + // so there is only a small penalty + return getNormalizedRatio(dataFiles, config.getMinorLeastFileCount()) + * getNormalizedRatio(dataFilesSize, config.getTargetSize()); + } + + private double getNormalizedRatio(double numerator, double denominator) { + if (denominator <= 0) { + return 0; + } + return Math.min(numerator, denominator) / denominator; + } + @Override public int getFragmentFileCount() { return fragmentFileCount; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java index 7ce4c3e2dd..765bdf3bb1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java @@ -200,10 +200,12 @@ public static class PendingInput { private long equalityDeleteBytes = 0L; private long equalityDeleteFileRecords = 0L; private long positionalDeleteFileRecords = 0L; + private int healthScore = -1; // -1 means not calculated public PendingInput() {} public PendingInput(Collection evaluators) { + double totalHealthScore = 0; for (PartitionEvaluator evaluator : evaluators) { partitions .computeIfAbsent(evaluator.getPartition().first(), ignore -> Sets.newHashSet()) @@ -217,7 +219,9 @@ public PendingInput(Collection evaluators) { equalityDeleteBytes += evaluator.getEqualityDeleteFileSize(); equalityDeleteFileRecords += evaluator.getEqualityDeleteFileRecords(); equalityDeleteFileCount += evaluator.getEqualityDeleteFileCount(); + totalHealthScore += evaluator.getHealthScore(); } + healthScore = (int) Math.ceil(totalHealthScore / evaluators.size()); } public Map> getPartitions() { @@ -260,6 +264,10 @@ public long getPositionalDeleteFileRecords() { return positionalDeleteFileRecords; } + public int getHealthScore() { + return healthScore; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -273,6 +281,7 @@ public String toString() { .add("equalityDeleteBytes", equalityDeleteBytes) .add("equalityDeleteFileRecords", equalityDeleteFileRecords) .add("positionalDeleteFileRecords", positionalDeleteFileRecords) + .add("healthScore", healthScore) .toString(); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/PartitionEvaluator.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/PartitionEvaluator.java index 974703bf07..cc99b2c28b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/PartitionEvaluator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/PartitionEvaluator.java @@ -81,6 +81,9 @@ interface Weight extends Comparable {} */ OptimizingType getOptimizingType(); + /** Get health score of this partition. */ + int getHealthScore(); + /** Get the count of fragment files involved in optimizing. */ int getFragmentFileCount(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java index 9884d2e785..73855d3a0f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java @@ -392,6 +392,10 @@ public OptimizingEvaluator.PendingInput getPendingInput() { return pendingInput; } + public OptimizingEvaluator.PendingInput getTableSummary() { + return tableSummary; + } + private boolean updateConfigInternal(Map properties) { TableConfiguration newTableConfig = TableConfigurations.parseTableConfig(properties); if (tableConfiguration.equals(newTableConfig)) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableSummaryMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableSummaryMetrics.java index 5cd16e26fb..7d4dfd8601 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableSummaryMetrics.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableSummaryMetrics.java @@ -112,13 +112,20 @@ public class TableSummaryMetrics { .withTags("catalog", "database", "table") .build(); - // table summary snapshots number metrics + // table summary snapshots number metric public static final MetricDefine TABLE_SUMMARY_SNAPSHOTS = defineGauge("table_summary_snapshots") .withDescription("Number of snapshots in the table") .withTags("catalog", "database", "table") .build(); + // table summary health score metric + public static final MetricDefine TABLE_SUMMARY_HEALTH_SCORE = + defineGauge("table_summary_health_score") + .withDescription("Health score of the table") + .withTags("catalog", "database", "table") + .build(); + private final ServerTableIdentifier identifier; private final List registeredMetricKeys = Lists.newArrayList(); private MetricRegistry globalRegistry; @@ -136,6 +143,7 @@ public class TableSummaryMetrics { private long dataFilesRecords = 0L; private long equalityDeleteFilesRecords = 0L; private long snapshots = 0L; + private long healthScore = 0L; public TableSummaryMetrics(ServerTableIdentifier identifier) { this.identifier = identifier; @@ -191,9 +199,12 @@ public void register(MetricRegistry registry) { TABLE_SUMMARY_EQUALITY_DELETE_FILES_RECORDS, (Gauge) () -> equalityDeleteFilesRecords); - // register snapshots number metrics + // register snapshots number metric registerMetric(registry, TABLE_SUMMARY_SNAPSHOTS, (Gauge) () -> snapshots); + // register health score metric + registerMetric(registry, TABLE_SUMMARY_HEALTH_SCORE, (Gauge) () -> healthScore); + globalRegistry = registry; } } @@ -231,6 +242,8 @@ public void refresh(OptimizingEvaluator.PendingInput tableSummary) { positionDeleteFilesRecords = tableSummary.getPositionalDeleteFileRecords(); dataFilesRecords = tableSummary.getDataFileRecords(); equalityDeleteFilesRecords = tableSummary.getEqualityDeleteFileRecords(); + + healthScore = tableSummary.getHealthScore(); } public void refreshSnapshots(MixedTable table) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java index 1b4632b29c..e0b992e784 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java @@ -24,6 +24,7 @@ import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_EQUALITY_DELETE_FILES; import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_EQUALITY_DELETE_FILES_RECORDS; import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_EQUALITY_DELETE_FILES_SIZE; +import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_HEALTH_SCORE; import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_POSITION_DELETE_FILES; import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_POSITION_DELETE_FILES_RECORDS; import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_POSITION_DELETE_FILES_SIZE; @@ -170,6 +171,8 @@ public void testTableSummaryMetrics() { Gauge snapshots = getMetric(metrics, identifier, TABLE_SUMMARY_SNAPSHOTS); + Gauge healthScore = getMetric(metrics, identifier, TABLE_SUMMARY_HEALTH_SCORE); + Assertions.assertEquals(0, totalFiles.getValue()); Assertions.assertEquals(0, dataFiles.getValue()); Assertions.assertEquals(0, posDelFiles.getValue()); @@ -184,6 +187,7 @@ public void testTableSummaryMetrics() { Assertions.assertEquals(0, dataRecords.getValue()); Assertions.assertEquals(0, posDelRecords.getValue()); Assertions.assertEquals(0, eqDelRecords.getValue()); + Assertions.assertEquals(0, healthScore.getValue()); // refresh metrics initTableWithFiles(); @@ -202,6 +206,7 @@ public void testTableSummaryMetrics() { Assertions.assertTrue(posDelRecords.getValue() > 0); Assertions.assertTrue(snapshots.getValue() > 0); + Assertions.assertTrue(healthScore.getValue() > 0); } private Gauge getMetric( diff --git a/amoro-common/src/main/java/org/apache/amoro/table/descriptor/TableSummary.java b/amoro-common/src/main/java/org/apache/amoro/table/descriptor/TableSummary.java index 95e611e7fa..641f124e2c 100644 --- a/amoro-common/src/main/java/org/apache/amoro/table/descriptor/TableSummary.java +++ b/amoro-common/src/main/java/org/apache/amoro/table/descriptor/TableSummary.java @@ -26,6 +26,7 @@ public class TableSummary { private String tableFormat; private long records; private String optimizingStatus; + private int healthScore = -1; // -1 means not calculated public TableSummary() {} @@ -71,4 +72,13 @@ public String getOptimizingStatus() { public void setOptimizingStatus(String optimizingStatus) { this.optimizingStatus = optimizingStatus; } + + /** Current table health score */ + public int getHealthScore() { + return healthScore; + } + + public void setHealthScore(int healthScore) { + this.healthScore = healthScore; + } } diff --git a/amoro-web/mock/modules/table.js b/amoro-web/mock/modules/table.js index 0e94541caf..a3e61a14d4 100644 --- a/amoro-web/mock/modules/table.js +++ b/amoro-web/mock/modules/table.js @@ -76,7 +76,10 @@ export default [ "file": 2, "size": "1.79KB", "tableFormat": "Iceberg(V1)", - "averageFile": "918.00B" + "averageFile": "918.00B", + "records":24, + "optimizingStatus":"IDLE", + "healthScore":100, }, "baseLocation": "/mnt/dfs/4/warehouse_public/db/user", "filter": null, diff --git a/amoro-web/src/language/en.ts b/amoro-web/src/language/en.ts index 60d494f0dd..06ba5513ef 100644 --- a/amoro-web/src/language/en.ts +++ b/amoro-web/src/language/en.ts @@ -17,6 +17,7 @@ */ export default { + healthScore: 'Health Score', overview: 'Overview', catalogs: 'Catalogs', catalog: 'Catalog', diff --git a/amoro-web/src/language/zh.ts b/amoro-web/src/language/zh.ts index 6bafedd9e6..0936135cc3 100644 --- a/amoro-web/src/language/zh.ts +++ b/amoro-web/src/language/zh.ts @@ -17,6 +17,7 @@ */ export default { + healthScore: '健康度', overview: '总览', catalogs: '目录', catalog: '目录', diff --git a/amoro-web/src/types/common.type.ts b/amoro-web/src/types/common.type.ts index 10b26d8879..4af04c14dc 100644 --- a/amoro-web/src/types/common.type.ts +++ b/amoro-web/src/types/common.type.ts @@ -60,6 +60,7 @@ export interface IKeyAndValue { } export interface IBaseDetailInfo { optimizingStatus: string + records: string tableType: string tableName: string createTime: string @@ -68,6 +69,7 @@ export interface IBaseDetailInfo { averageFile: string tableFormat: string hasPartition: boolean + healthScore: number } export interface DetailColumnItem { diff --git a/amoro-web/src/views/tables/index.vue b/amoro-web/src/views/tables/index.vue index 6a911e908e..18fa8fca73 100644 --- a/amoro-web/src/views/tables/index.vue +++ b/amoro-web/src/views/tables/index.vue @@ -54,11 +54,13 @@ export default defineComponent({ isSecondaryNav: false, baseInfo: { optimizingStatus: '', + records: '', tableType: '', tableName: '', createTime: '', tableFormat: '', hasPartition: false, + healthScore: -1, } as IBaseDetailInfo, detailLoaded: false, }) @@ -150,6 +152,10 @@ export default defineComponent({

{{ $t('tableFormat') }}: {{ baseInfo.tableFormat }}

+ +

+ {{ $t('healthScore') }}: {{ baseInfo.healthScore == null || baseInfo.healthScore < 0 ? 'N/A' : baseInfo.healthScore }} +

@@ -178,9 +184,11 @@ export default defineComponent({ border: 1px solid #e8e8f0; padding: 12px 0; min-height: 100%; + .create-time { margin-top: 12px; } + .tables-menu-wrap { position: fixed; width: 100%; @@ -189,6 +197,7 @@ export default defineComponent({ left: 200px; z-index: 100; } + .table-name { font-size: 24px; line-height: 1.5; @@ -196,16 +205,20 @@ export default defineComponent({ max-width: 100%; padding-left: 24px; } + .table-info { padding: 12px 24px 0 24px; + .text-color { color: #7CB305; } } + .table-edit { font-size: 18px; padding-right: 12px; } + :deep(.ant-tabs-nav) { padding-left: 12px; margin-bottom: 0; diff --git a/docs/user-guides/metrics.md b/docs/user-guides/metrics.md index cc86bcb0d9..d65ce4c9ed 100644 --- a/docs/user-guides/metrics.md +++ b/docs/user-guides/metrics.md @@ -100,18 +100,19 @@ Amoro has supported built-in metrics to measure status of table self-optimizing ## table summary metrics -| Metric Name | Type | Tags | Description | -|---------------------------------------------|---------|--------------------------|-----------------------------------------------| -| table_summary_total_files | Gauge | catalog, database, table | Total number of files in the table | -| table_summary_data_files | Gauge | catalog, database, table | Number of data files in the table | -| table_summary_equality_delete_files | Gauge | catalog, database, table | Number of equality delete files in the table | -| table_summary_position_delete_files | Gauge | catalog, database, table | Number of position delete files in the table | -| table_summary_total_files_size | Gauge | catalog, database, table | Total size of files in the table | -| table_summary_data_files_size | Gauge | catalog, database, table | Size of data files in the table | -| table_summary_equality_delete_files_size | Gauge | catalog, database, table | Size of equality delete files in the table | -| table_summary_position_delete_files_size | Gauge | catalog, database, table | Size of position delete files in the table | -| table_summary_total_records | Gauge | catalog, database, table | Total records in the table | -| table_summary_data_files_records | Gauge | catalog, database, table | Records of data files in the table | -| table_summary_equality_delete_files_records | Gauge | catalog, database, table | Records of equality delete files in the table | -| table_summary_position_delete_files_records | Gauge | catalog, database, table | Records of position delete files in the table | -| table_summary_snapshots | Gauge | catalog, database, table | Number of snapshots in the table | \ No newline at end of file +| Metric Name | Type | Tags | Description | +|-----------------------------------------------|---------|--------------------------|-----------------------------------------------| +| table_summary_total_files | Gauge | catalog, database, table | Total number of files in the table | +| table_summary_data_files | Gauge | catalog, database, table | Number of data files in the table | +| table_summary_equality_delete_files | Gauge | catalog, database, table | Number of equality delete files in the table | +| table_summary_position_delete_files | Gauge | catalog, database, table | Number of position delete files in the table | +| table_summary_total_files_size | Gauge | catalog, database, table | Total size of files in the table | +| table_summary_data_files_size | Gauge | catalog, database, table | Size of data files in the table | +| table_summary_equality_delete_files_size | Gauge | catalog, database, table | Size of equality delete files in the table | +| table_summary_position_delete_files_size | Gauge | catalog, database, table | Size of position delete files in the table | +| table_summary_total_records | Gauge | catalog, database, table | Total records in the table | +| table_summary_data_files_records | Gauge | catalog, database, table | Records of data files in the table | +| table_summary_equality_delete_files_records | Gauge | catalog, database, table | Records of equality delete files in the table | +| table_summary_position_delete_files_records | Gauge | catalog, database, table | Records of position delete files in the table | +| table_summary_snapshots | Gauge | catalog, database, table | Number of snapshots in the table | +| table_summary_health_score | Gauge | catalog, database, table | Health score of the table | \ No newline at end of file From b52f64d7784441dd2496bae6844cfe55a3403088 Mon Sep 17 00:00:00 2001 From: Congxian Qiu Date: Tue, 10 Sep 2024 15:18:19 +0800 Subject: [PATCH 4/6] [AMORO-3145] Filter out info log for CodecPool in optimizer (#3185) [AMORO-3145] Filter out info log for CodecPool Currently there will be many logs for (de)compressor from CodecPool, and seems there is no other meaningful info log from this class, so we change the log level from CodecPool from info to warn to filter out these logs This only affect the local optimizer, and user need to change the configuration if they use flink/spark optimzier. --- dist/src/main/amoro-bin/conf/optimize/log4j2.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/dist/src/main/amoro-bin/conf/optimize/log4j2.xml b/dist/src/main/amoro-bin/conf/optimize/log4j2.xml index 6e4b9a13db..8ce1769969 100644 --- a/dist/src/main/amoro-bin/conf/optimize/log4j2.xml +++ b/dist/src/main/amoro-bin/conf/optimize/log4j2.xml @@ -101,6 +101,7 @@ + From 62a095ff966d3dfb63c0000270646d7d7def5457 Mon Sep 17 00:00:00 2001 From: ZhouJinsong Date: Tue, 10 Sep 2024 16:45:16 +0800 Subject: [PATCH 5/6] [Hotfix] Fix github ci due to module changes (#3191) Fix github ci configurations --- .github/workflows/core-hadoop2-ci.yml | 7 ++++++- .github/workflows/core-hadoop3-ci.yml | 7 ++++++- .github/workflows/trino-ci.yml | 3 ++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/.github/workflows/core-hadoop2-ci.yml b/.github/workflows/core-hadoop2-ci.yml index d5ac469e8f..52e9834cf7 100644 --- a/.github/workflows/core-hadoop2-ci.yml +++ b/.github/workflows/core-hadoop2-ci.yml @@ -20,7 +20,12 @@ on: paths: - ".github/workflows/**" - "amoro-ams/**" - - "amoro-core/**" + - "amoro-common/**" + - "amoro-metrics/**" + - "amoro-optimizer/**" + - "amoro-format-iceberg/**" + - "amoro-format-paimon/**" + - "amoro-format-hudi/**" - "amoro-format-mixed/amoro-format-mixed-flink/**" - "amoro-format-mixed/amoro-format-mixed-hive/**" - "amoro-format-mixed/amoro-format-mixed-spark/**" diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml index 034ec67cf8..4ccd7d69f4 100644 --- a/.github/workflows/core-hadoop3-ci.yml +++ b/.github/workflows/core-hadoop3-ci.yml @@ -20,7 +20,12 @@ on: paths: - ".github/workflows/**" - "amoro-ams/**" - - "amoro-core/**" + - "amoro-common/**" + - "amoro-metrics/**" + - "amoro-optimizer/**" + - "amoro-format-iceberg/**" + - "amoro-format-paimon/**" + - "amoro-format-hudi/**" - "amoro-format-mixed/amoro-format-mixed-flink/**" - "amoro-format-mixed/amoro-format-mixed-hive/**" - "amoro-format-mixed/amoro-format-mixed-spark/**" diff --git a/.github/workflows/trino-ci.yml b/.github/workflows/trino-ci.yml index 075823a7d9..6dc4336eef 100644 --- a/.github/workflows/trino-ci.yml +++ b/.github/workflows/trino-ci.yml @@ -20,7 +20,8 @@ on: paths: - ".github/workflows/**" - "amoro-ams/**" - - "amoro-core/**" + - "amoro-common/**" + - "amoro-format-iceberg/**" - "amoro-format-mixed/amoro-mixed-hive/**" - "amoro-format-mixed/amoro-mixed-trino/**" - "pom.xml" From 260554fde9254da2ff5df9b8a188843e90ccd4a9 Mon Sep 17 00:00:00 2001 From: Congxian Qiu Date: Tue, 10 Sep 2024 18:45:15 +0800 Subject: [PATCH 6/6] [AMORO-3186] Force check git dir when creating release binary (#3187) --- tools/releasing/create_binary_release.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/releasing/create_binary_release.sh b/tools/releasing/create_binary_release.sh index ea82fd9f07..0d801126d5 100644 --- a/tools/releasing/create_binary_release.sh +++ b/tools/releasing/create_binary_release.sh @@ -65,7 +65,7 @@ make_binary_release() { echo "Creating ${HADOOP_VERSION} binary release" # enable release profile here (to check for the maven version) - $MVN clean package ${HADOOP_PROFILE} -Pno-extended-disk-storage -pl ':dist' -am -Dgpg.skip -Dcheckstyle.skip=true -DskipTests + $MVN clean package ${HADOOP_PROFILE} -Pno-extended-disk-storage -Pfail-on-no-git-dir -pl ':dist' -am -Dgpg.skip -Dcheckstyle.skip=true -DskipTests local TARGET_FILE="apache-amoro-${RELEASE_VERSION}-bin-${HADOOP_VERSION}.tar.gz" cp dist/target/apache-amoro-${RELEASE_VERSION}-bin.tar.gz ${RELEASE_DIR}/${TARGET_FILE}