From 3f4c8de4dac18aedae0633163b4e24c1611e7115 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Tue, 14 Jan 2025 21:52:47 +0800 Subject: [PATCH] complete --- .../amoro/server/AmoroServiceContainer.java | 14 +- .../server/DefaultOptimizingService.java | 6 +- .../amoro/server/RestCatalogService.java | 4 +- .../amoro/server/TableManagementService.java | 7 +- .../amoro/server/catalog/CatalogManager.java | 10 + .../server/catalog/DefaultCatalogManager.java | 8 + .../amoro/server/catalog/InternalCatalog.java | 14 +- .../server/dashboard/DashboardServer.java | 13 +- .../dashboard/ServerTableDescriptor.java | 1 - .../controller/OptimizerGroupController.java | 13 +- .../dashboard/controller/TableController.java | 14 +- .../server/optimizing/OptimizingQueue.java | 16 +- .../server/table/DefaultTableManager.java | 35 +- .../server/table/DefaultTableService.java | 6 +- .../server/table/DefaultTableServiceOld.java | 682 ------------------ .../server/table/MaintainedTableManager.java | 28 +- .../amoro/server/table/TableManager.java | 3 +- .../amoro/server/table/TableMetadata.java | 2 +- .../amoro/server/table/TableService.java | 12 +- .../table/executor/AsyncTableExecutors.java | 24 +- .../table/executor/BaseTableExecutor.java | 12 +- .../executor/BlockerExpiringExecutor.java | 6 +- .../DanglingDeleteFilesCleaningExecutor.java | 6 +- .../table/executor/DataExpiringExecutor.java | 6 +- .../executor/HiveCommitSyncExecutor.java | 6 +- .../executor/OptimizingCommitExecutor.java | 6 +- .../executor/OptimizingExpiringExecutor.java | 6 +- .../executor/OrphanFilesCleaningExecutor.java | 6 +- .../executor/SnapshotsExpiringExecutor.java | 6 +- .../executor/TableRuntimeRefreshExecutor.java | 6 +- .../executor/TagsAutoCreatingExecutor.java | 6 +- .../internal/InternalIcebergCreator.java | 2 +- .../table/internal/InternalTableCreator.java | 3 +- .../server/terminal/TerminalManager.java | 4 +- .../amoro/server/AMSManagerTestBase.java | 65 ++ ...eTestBase.java => AMSServiceTestBase.java} | 23 +- .../apache/amoro/server/AmsEnvironment.java | 13 +- .../server/RestCatalogServiceTestBase.java | 1 - .../server/TestDefaultOptimizingService.java | 30 +- .../server/catalog/TableCatalogTestBase.java | 4 +- .../optimizing/TestOptimizingQueue.java | 4 +- .../amoro/server/table/AMSTableTestBase.java | 15 +- .../server/table/TestCatalogService.java | 3 +- ...ableService.java => TestTableManager.java} | 60 +- .../server/table/TestTableRuntimeHandler.java | 10 +- .../executor/TestBlockerExpiringExecutor.java | 16 +- .../apache/amoro/ServerTableIdentifier.java | 4 +- 47 files changed, 367 insertions(+), 874 deletions(-) delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableServiceOld.java create mode 100644 amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java rename amoro-ams/src/test/java/org/apache/amoro/server/{table/TableServiceTestBase.java => AMSServiceTestBase.java} (73%) rename amoro-ams/src/test/java/org/apache/amoro/server/table/{TestTableService.java => TestTableManager.java} (85%) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 2d0e7eb0ca..f2880ebf6f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -45,11 +45,9 @@ import org.apache.amoro.server.resource.ResourceContainers; import org.apache.amoro.server.table.DefaultTableManager; import org.apache.amoro.server.table.DefaultTableService; -import org.apache.amoro.server.table.DefaultTableServiceOld; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableService; -import org.apache.amoro.server.table.TableServiceOld; import org.apache.amoro.server.table.executor.AsyncTableExecutors; import org.apache.amoro.server.terminal.TerminalManager; import org.apache.amoro.server.utils.ThriftServiceProxy; @@ -155,10 +153,11 @@ public void startService() throws Exception { MetricManager.getInstance(); catalogManager = new DefaultCatalogManager(serviceConfig); - tableManager = new DefaultTableManager(serviceConfig, catalogManager); + tableService = new DefaultTableService(serviceConfig, catalogManager); - optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, tableService); + optimizingService = + new DefaultOptimizingService(serviceConfig, catalogManager, tableManager, tableService); LOG.info("Setting up AMS table executors..."); AsyncTableExecutors.getInstance().setup(tableService, serviceConfig); @@ -253,7 +252,12 @@ private void startThriftServer(TServer server, String threadName) { private void initHttpService() { DashboardServer dashboardServer = new DashboardServer( - serviceConfig, catalogManager, tableManager, optimizingService, terminalManager); + serviceConfig, + catalogManager, + tableManager, + optimizingService, + terminalManager, + tableService); RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager); httpServer = diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index fd6be4e9eb..9e940bf3da 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -48,12 +48,10 @@ import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.resource.OptimizerThread; import org.apache.amoro.server.resource.QuotaProvider; -import org.apache.amoro.server.table.DefaultTableServiceOld; import org.apache.amoro.server.table.MaintainedTableManager; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.table.TableService; -import org.apache.amoro.server.table.TableServiceOld; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -145,7 +143,7 @@ private void loadOptimizingQueues(List tableRuntimeMetaList) { List tableRuntimes = groupToTableRuntimes.remove(groupName); OptimizingQueue optimizingQueue = new OptimizingQueue( - tableService, + catalogManager, group, this, planExecutor, @@ -322,7 +320,7 @@ public void createResourceGroup(ResourceGroup resourceGroup) { doAs(ResourceMapper.class, mapper -> mapper.insertResourceGroup(resourceGroup)); OptimizingQueue optimizingQueue = new OptimizingQueue( - tableService, + catalogManager, resourceGroup, this, planExecutor, diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java index abef8bd2d4..bb3d47cc00 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java @@ -44,7 +44,6 @@ import org.apache.amoro.server.catalog.ServerCatalog; import org.apache.amoro.server.manager.EventsManager; import org.apache.amoro.server.persistence.PersistentBase; -import org.apache.amoro.server.table.TableServiceOld; import org.apache.amoro.server.table.internal.InternalTableCreator; import org.apache.amoro.server.table.internal.InternalTableHandler; import org.apache.amoro.server.table.internal.InternalTableManager; @@ -348,7 +347,8 @@ public void deleteTable(Context ctx) { Boolean.parseBoolean( Optional.ofNullable(ctx.req.getParameter("purgeRequested")).orElse("false")); org.apache.amoro.server.table.TableMetadata tableMetadata = handler.tableMetadata(); - tableManager.dropTableMetadata(tableMetadata.getTableIdentifier().getIdentifier(), purge); + tableManager.dropTableMetadata( + tableMetadata.getTableIdentifier().getIdentifier().buildTableIdentifier(), purge); handler.dropTable(purge); return null; }); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/TableManagementService.java b/amoro-ams/src/main/java/org/apache/amoro/server/TableManagementService.java index fd82fad035..73f787a97f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/TableManagementService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/TableManagementService.java @@ -32,10 +32,8 @@ import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.catalog.InternalCatalog; import org.apache.amoro.server.catalog.ServerCatalog; -import org.apache.amoro.server.table.MaintainedTableManager; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableMetadata; -import org.apache.amoro.server.table.TableServiceOld; import org.apache.amoro.server.utils.InternalTableUtil; import org.apache.amoro.shade.thrift.org.apache.thrift.TException; @@ -91,7 +89,10 @@ public void createTableMeta(TableMeta tableMeta) { } ServerTableIdentifier identifier = ServerTableIdentifier.of( - tableMeta.getTableIdentifier(), TableFormat.valueOf(tableMeta.getFormat())); + tableMeta.getTableIdentifier().getCatalog(), + tableMeta.getTableIdentifier().getDatabase(), + tableMeta.getTableIdentifier().getTableName(), + TableFormat.valueOf(tableMeta.getFormat())); InternalCatalog catalog = catalogManager.getInternalCatalog(identifier.getCatalog()); CatalogMeta catalogMeta = catalog.getMetadata(); TableMetadata tableMetadata = new TableMetadata(identifier, tableMeta, catalogMeta); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogManager.java index 35ba55baef..1ff7794532 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogManager.java @@ -18,7 +18,9 @@ package org.apache.amoro.server.catalog; +import org.apache.amoro.AmoroTable; import org.apache.amoro.api.CatalogMeta; +import org.apache.amoro.table.TableIdentifier; import java.util.List; @@ -90,4 +92,12 @@ public interface CatalogManager { * @param catalogMeta The CatalogMeta object representing the updated catalog information. */ void updateCatalog(CatalogMeta catalogMeta); + + /** + * load a table via server catalog. + * + * @param identifier managed table identifier + * @return managed table. + */ + AmoroTable loadTable(TableIdentifier identifier); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java index c2ead95924..3e62cc761d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java @@ -18,6 +18,7 @@ package org.apache.amoro.server.catalog; +import org.apache.amoro.AmoroTable; import org.apache.amoro.api.CatalogMeta; import org.apache.amoro.config.Configurations; import org.apache.amoro.exception.AlreadyExistsException; @@ -31,6 +32,7 @@ import org.apache.amoro.shade.guava32.com.google.common.cache.CacheLoader; import org.apache.amoro.shade.guava32.com.google.common.cache.LoadingCache; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.table.TableIdentifier; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,6 +197,12 @@ public void updateCatalog(CatalogMeta catalogMeta) { LOG.info("Update catalog metadata: {}", catalogMeta.getCatalogName()); } + @Override + public AmoroTable loadTable(TableIdentifier identifier) { + ServerCatalog serverCatalog = getServerCatalog(identifier.getCatalog()); + return serverCatalog.loadTable(identifier.getDatabase(), identifier.getTableName()); + } + private void validateCatalogUpdate(CatalogMeta oldMeta, CatalogMeta newMeta) { if (!oldMeta.getCatalogType().equals(newMeta.getCatalogType())) { throw new IllegalMetadataException("Cannot update catalog type"); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java index b823f94b1f..d6a6bd9895 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java @@ -22,7 +22,6 @@ import org.apache.amoro.TableFormat; import org.apache.amoro.TableIDWithFormat; import org.apache.amoro.api.CatalogMeta; -import org.apache.amoro.api.TableIdentifier; import org.apache.amoro.exception.AlreadyExistsException; import org.apache.amoro.exception.IllegalMetadataException; import org.apache.amoro.exception.ObjectNotExistsException; @@ -32,6 +31,7 @@ import org.apache.amoro.server.table.TableMetadata; import org.apache.amoro.server.table.internal.InternalTableCreator; import org.apache.amoro.server.table.internal.InternalTableHandler; +import org.apache.amoro.table.TableIdentifier; import org.apache.iceberg.rest.requests.CreateTableRequest; import java.util.List; @@ -101,11 +101,7 @@ public List listTables(String database) { TableMetaMapper.class, mapper -> mapper.selectTableIdentifiersByDb(getMetadata().getCatalogName(), database)) .stream() - .map( - sid -> - TableIDWithFormat.of( - org.apache.amoro.table.TableIdentifier.of(sid.getIdentifier()), - sid.getFormat())) + .map(sid -> TableIDWithFormat.of(sid.getIdentifier(), sid.getFormat())) .collect(Collectors.toList()); } @@ -115,11 +111,7 @@ public List listTables() { TableMetaMapper.class, mapper -> mapper.selectTableIdentifiersByCatalog(getMetadata().getCatalogName())) .stream() - .map( - sid -> - TableIDWithFormat.of( - org.apache.amoro.table.TableIdentifier.of(sid.getIdentifier()), - sid.getFormat())) + .map(sid -> TableIDWithFormat.of(sid.getIdentifier(), sid.getFormat())) .collect(Collectors.toList()); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java index 523da13f5a..4fb925c33d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java @@ -52,7 +52,7 @@ import org.apache.amoro.server.dashboard.response.ErrorResponse; import org.apache.amoro.server.dashboard.utils.ParamSignatureCalculator; import org.apache.amoro.server.table.TableManager; -import org.apache.amoro.server.table.TableServiceOld; +import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.terminal.TerminalManager; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -101,19 +101,24 @@ public DashboardServer( CatalogManager catalogManager, TableManager tableManager, DefaultOptimizingService optimizerManager, - TerminalManager terminalManager) { + TerminalManager terminalManager, + TableService tableService) { PlatformFileManager platformFileManager = new PlatformFileManager(); this.catalogController = new CatalogController(catalogManager, platformFileManager); this.healthCheckController = new HealthCheckController(); this.loginController = new LoginController(serviceConfig); - this.optimizerGroupController = new OptimizerGroupController(tableService, optimizerManager); + // TODO: remove table service from OptimizerGroupController + this.optimizerGroupController = + new OptimizerGroupController(tableManager, tableService, optimizerManager); this.optimizerController = new OptimizerController(optimizerManager); this.platformFileInfoController = new PlatformFileInfoController(platformFileManager); this.settingController = new SettingController(serviceConfig, optimizerManager); ServerTableDescriptor tableDescriptor = new ServerTableDescriptor(catalogManager, tableManager, serviceConfig); + // TODO: remove table service from TableController this.tableController = - new TableController(catalogManager, tableService, tableDescriptor, serviceConfig); + new TableController( + catalogManager, tableManager, tableService, tableDescriptor, serviceConfig); this.terminalController = new TerminalController(terminalManager); this.versionController = new VersionController(); this.overviewController = new OverviewController(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java index b3d055a982..4e2b1aef8c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java @@ -27,7 +27,6 @@ import org.apache.amoro.server.catalog.ServerCatalog; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.table.TableManager; -import org.apache.amoro.server.table.TableServiceOld; import org.apache.amoro.table.descriptor.AmoroSnapshotsOfTable; import org.apache.amoro.table.descriptor.ConsumerInfo; import org.apache.amoro.table.descriptor.DDLInfo; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java index e57dfb1b06..ddaca1399c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java @@ -34,8 +34,9 @@ import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.ResourceContainers; +import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableServiceOld; +import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -58,11 +59,15 @@ public class OptimizerGroupController { private static final Logger LOG = LoggerFactory.getLogger(OptimizerGroupController.class); private static final String ALL_GROUP = "all"; - private final TableServiceOld tableService; + private final TableManager tableManager; + private final TableService tableService; private final DefaultOptimizingService optimizerManager; public OptimizerGroupController( - TableServiceOld tableService, DefaultOptimizingService optimizerManager) { + TableManager tableManager, + TableService tableService, + DefaultOptimizingService optimizerManager) { + this.tableManager = tableManager; this.tableService = tableService; this.optimizerManager = optimizerManager; } @@ -102,7 +107,7 @@ public void getOptimizerTables(Context ctx) { statusCodes = null; } Pair, Integer> tableRuntimeBeans = - tableService.getTableRuntimes( + tableManager.queryTableRuntimeMetas( optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java index 44d7f23b90..f6c19de327 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java @@ -51,8 +51,9 @@ import org.apache.amoro.server.dashboard.utils.AmsUtil; import org.apache.amoro.server.dashboard.utils.CommonUtil; import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableServiceOld; +import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Function; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -101,7 +102,8 @@ public class TableController { private static final long UPGRADE_INFO_EXPIRE_INTERVAL = 60 * 60 * 1000; private final CatalogManager catalogManager; - private final TableServiceOld tableService; + private final TableManager tableManager; + private final TableService tableService; private final ServerTableDescriptor tableDescriptor; private final Configurations serviceConfig; private final ConcurrentHashMap upgradeRunningInfo = @@ -110,10 +112,12 @@ public class TableController { public TableController( CatalogManager catalogManager, - TableServiceOld tableService, + TableManager tableManager, + TableService tableService, ServerTableDescriptor tableDescriptor, Configurations serviceConfig) { this.catalogManager = catalogManager; + this.tableManager = tableManager; this.tableService = tableService; this.tableDescriptor = tableDescriptor; this.serviceConfig = serviceConfig; @@ -150,7 +154,7 @@ public void getTableDetail(Context ctx) { TableSummary tableSummary = serverTableMeta.getTableSummary(); Optional serverTableIdentifier = Optional.ofNullable( - tableService.getServerTableIdentifier( + tableManager.getServerTableIdentifier( TableIdentifier.of(catalog, database, tableName).buildTableIdentifier())); if (serverTableIdentifier.isPresent()) { TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get().getId()); @@ -679,7 +683,7 @@ public void cancelOptimizingProcess(Context ctx) { Preconditions.checkState(catalogManager.catalogExist(catalog), "invalid catalog!"); ServerTableIdentifier serverTableIdentifier = - tableService.getServerTableIdentifier( + tableManager.getServerTableIdentifier( TableIdentifier.of(catalog, db, table).buildTableIdentifier()); TableRuntime tableRuntime = serverTableIdentifier != null diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 32a5a0d410..db10d9a8b6 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -31,13 +31,13 @@ import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.server.AmoroServiceConstants; +import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.TaskFilesPersistence; import org.apache.amoro.server.persistence.mapper.OptimizingMapper; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.QuotaProvider; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; @@ -81,7 +81,7 @@ public class OptimizingQueue extends PersistentBase { private final Queue tableQueue = new LinkedTransferQueue<>(); private final Queue> retryTaskQueue = new LinkedTransferQueue<>(); private final SchedulingPolicy scheduler; - private final TableManagerOld tableManager; + private final CatalogManager catalogManager; private final Executor planExecutor; // Keep all planning table identifiers private final Set planningTables = new HashSet<>(); @@ -92,7 +92,7 @@ public class OptimizingQueue extends PersistentBase { private ResourceGroup optimizerGroup; public OptimizingQueue( - TableManagerOld tableManager, + CatalogManager catalogManager, ResourceGroup optimizerGroup, QuotaProvider quotaProvider, Executor planExecutor, @@ -103,7 +103,7 @@ public OptimizingQueue( this.optimizerGroup = optimizerGroup; this.quotaProvider = quotaProvider; this.scheduler = new SchedulingPolicy(optimizerGroup); - this.tableManager = tableManager; + this.catalogManager = catalogManager; this.maxPlanningParallelism = maxPlanningParallelism; this.metrics = new OptimizerGroupMetrics( @@ -275,7 +275,8 @@ private void triggerAsyncPlanning( private TableOptimizingProcess planInternal(TableRuntime tableRuntime) { tableRuntime.beginPlanning(); try { - AmoroTable table = tableManager.loadTable(tableRuntime.getTableIdentifier()); + ServerTableIdentifier identifier = tableRuntime.getTableIdentifier(); + AmoroTable table = catalogManager.loadTable(identifier.getIdentifier()); AbstractOptimizingPlanner planner = IcebergTableUtil.createOptimizingPlanner( tableRuntime.refresh(table), @@ -601,7 +602,10 @@ public MetricsSummary getSummary() { private UnKeyedTableCommit buildCommit() { MixedTable table = - (MixedTable) tableManager.loadTable(tableRuntime.getTableIdentifier()).originalTable(); + (MixedTable) + catalogManager + .loadTable(tableRuntime.getTableIdentifier().getIdentifier()) + .originalTable(); if (table.isUnkeyedTable()) { return new UnKeyedTableCommit(targetSnapshotId, table, taskMap.values()); } else { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java index e073e92ccd..be63b0406e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java @@ -18,6 +18,9 @@ package org.apache.amoro.server.table; +import com.github.pagehelper.Page; +import com.github.pagehelper.PageHelper; +import com.github.pagehelper.PageInfo; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.api.BlockableOperation; import org.apache.amoro.api.Blocker; @@ -32,15 +35,18 @@ import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.catalog.InternalCatalog; import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; import org.apache.amoro.server.table.blocker.TableBlocker; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.util.Comparator; import java.util.List; import java.util.Map; @@ -101,7 +107,8 @@ public void createTable(String catalogName, TableMetadata tableMetadata) { String database = tableMetadata.getTableIdentifier().getDatabase(); String table = tableMetadata.getTableIdentifier().getTableName(); if (catalog.tableExists(database, table)) { - throw new AlreadyExistsException(tableMetadata.getTableIdentifier().getIdentifier()); + throw new AlreadyExistsException( + tableMetadata.getTableIdentifier().getIdentifier().buildTableIdentifier()); } TableMetadata metadata = catalog.createTable(tableMetadata); @@ -213,4 +220,30 @@ public ServerTableIdentifier getServerTableIdentifier(TableIdentifier id) { mapper -> mapper.selectTableIdentifier(id.getCatalog(), id.getDatabase(), id.getTableName())); } + + @Override + public Pair, Integer> queryTableRuntimeMetas( + String optimizerGroup, + @Nullable String fuzzyDbName, + @Nullable String fuzzyTableName, + @Nullable List statusCodeFilters, + int limit, + int offset) { + + // page helper is 1-based + int pageNumber = (offset / limit) + 1; + + try (Page ignore = PageHelper.startPage(pageNumber, limit, true)) { + int total = 0; + List ret = + getAs( + TableMetaMapper.class, + mapper -> + mapper.selectTableRuntimesForOptimizerGroup( + optimizerGroup, fuzzyDbName, fuzzyTableName, statusCodeFilters)); + PageInfo pageInfo = new PageInfo<>(ret); + total = (int) pageInfo.getTotal(); + return Pair.of(ret, total); + } + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index 92c4ee9071..23489b29fa 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -183,7 +183,6 @@ private TableRuntime getAndCheckExist(ServerTableIdentifier tableIdentifier) { return tableRuntime; } - @Override public TableRuntime getRuntime(Long tableId) { checkStarted(); @@ -196,6 +195,11 @@ public boolean contains(Long tableId) { return tableRuntimeMap.containsKey(tableId); } + @Override + public AmoroTable loadTable(ServerTableIdentifier identifier) { + return catalogManager.loadTable(identifier.getIdentifier()); + } + @Override public void dispose() { tableExplorerScheduler.shutdown(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableServiceOld.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableServiceOld.java deleted file mode 100644 index 3e9f11386c..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableServiceOld.java +++ /dev/null @@ -1,682 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.table; - -import com.github.pagehelper.Page; -import com.github.pagehelper.PageHelper; -import com.github.pagehelper.PageInfo; -import org.apache.amoro.AmoroTable; -import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.TableFormat; -import org.apache.amoro.TableIDWithFormat; -import org.apache.amoro.api.BlockableOperation; -import org.apache.amoro.api.Blocker; -import org.apache.amoro.api.CatalogMeta; -import org.apache.amoro.api.TableIdentifier; -import org.apache.amoro.config.Configurations; -import org.apache.amoro.config.TableConfiguration; -import org.apache.amoro.exception.AlreadyExistsException; -import org.apache.amoro.exception.BlockerConflictException; -import org.apache.amoro.exception.IllegalMetadataException; -import org.apache.amoro.exception.ObjectNotExistsException; -import org.apache.amoro.exception.PersistenceException; -import org.apache.amoro.server.AmoroManagementConf; -import org.apache.amoro.server.catalog.CatalogManager; -import org.apache.amoro.server.catalog.ExternalCatalog; -import org.apache.amoro.server.catalog.InternalCatalog; -import org.apache.amoro.server.catalog.ServerCatalog; -import org.apache.amoro.server.manager.MetricManager; -import org.apache.amoro.server.optimizing.OptimizingStatus; -import org.apache.amoro.server.persistence.PersistentBase; -import org.apache.amoro.server.persistence.TableRuntimeMeta; -import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; -import org.apache.amoro.server.persistence.mapper.TableMetaMapper; -import org.apache.amoro.server.table.blocker.TableBlocker; -import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; -import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; -import org.apache.amoro.shade.guava32.com.google.common.base.Objects; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; -import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; -import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.amoro.utils.TablePropertyUtil; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -public class DefaultTableServiceOld extends PersistentBase implements TableServiceOld { - - public static final Logger LOG = LoggerFactory.getLogger(DefaultTableServiceOld.class); - private static final int TABLE_BLOCKER_RETRY = 3; - private final long externalCatalogRefreshingInterval; - private final long blockerTimeout; - - private final Map tableRuntimeMap = new ConcurrentHashMap<>(); - - private final ScheduledExecutorService tableExplorerScheduler = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat("table-explorer-scheduler-%d") - .setDaemon(true) - .build()); - private final CompletableFuture initialized = new CompletableFuture<>(); - private final Configurations serverConfiguration; - private final CatalogManager catalogManager; - private RuntimeHandlerChain headHandler; - private ExecutorService tableExplorerExecutors; - - public DefaultTableServiceOld(Configurations configuration, CatalogManager catalogManager) { - this.catalogManager = catalogManager; - this.externalCatalogRefreshingInterval = - configuration.getLong(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL); - this.blockerTimeout = configuration.getLong(AmoroManagementConf.BLOCKER_TIMEOUT); - this.serverConfiguration = configuration; - } - - @Override - public void dropTableMetadata(TableIdentifier tableIdentifier, boolean deleteData) { - checkStarted(); - if (StringUtils.isBlank(tableIdentifier.getTableName())) { - throw new IllegalMetadataException("table name is blank"); - } - if (StringUtils.isBlank(tableIdentifier.getCatalog())) { - throw new IllegalMetadataException("catalog is blank"); - } - if (StringUtils.isBlank(tableIdentifier.getDatabase())) { - throw new IllegalMetadataException("database is blank"); - } - - InternalCatalog internalCatalog = - catalogManager.getInternalCatalog(tableIdentifier.getCatalog()); - String database = tableIdentifier.getDatabase(); - String table = tableIdentifier.getTableName(); - if (!internalCatalog.tableExists(database, table)) { - throw new ObjectNotExistsException(tableIdentifier); - } - - ServerTableIdentifier serverTableIdentifier = internalCatalog.dropTable(database, table); - Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier.getId())) - .ifPresent( - tableRuntime -> { - if (headHandler != null) { - headHandler.fireTableRemoved(tableRuntime); - } - tableRuntime.dispose(); - }); - } - - @Override - public void createTable(String catalogName, TableMetadata tableMetadata) { - checkStarted(); - InternalCatalog catalog = catalogManager.getInternalCatalog(catalogName); - String database = tableMetadata.getTableIdentifier().getDatabase(); - String table = tableMetadata.getTableIdentifier().getTableName(); - if (catalog.tableExists(database, table)) { - throw new AlreadyExistsException(tableMetadata.getTableIdentifier().getIdentifier()); - } - - TableMetadata metadata = catalog.createTable(tableMetadata); - - triggerTableAdded(catalog, metadata.getTableIdentifier()); - } - - @Override - public List listManagedTables() { - checkStarted(); - return getAs(TableMetaMapper.class, TableMetaMapper::selectAllTableIdentifiers); - } - - @Override - public AmoroTable loadTable(ServerTableIdentifier tableIdentifier) { - checkStarted(); - return catalogManager - .getServerCatalog(tableIdentifier.getCatalog()) - .loadTable(tableIdentifier.getDatabase(), tableIdentifier.getTableName()); - } - - @Override - public Blocker block( - TableIdentifier tableIdentifier, - List operations, - Map properties) { - Preconditions.checkNotNull(operations, "operations should not be null"); - Preconditions.checkArgument(!operations.isEmpty(), "operations should not be empty"); - Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must > 0"); - String catalog = tableIdentifier.getCatalog(); - String database = tableIdentifier.getDatabase(); - String table = tableIdentifier.getTableName(); - int tryCount = 0; - while (tryCount++ < TABLE_BLOCKER_RETRY) { - long now = System.currentTimeMillis(); - doAs( - TableBlockerMapper.class, - mapper -> mapper.deleteExpiredBlockers(catalog, database, table, now)); - List tableBlockers = - getAs( - TableBlockerMapper.class, - mapper -> - mapper.selectBlockers( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - now)); - if (TableBlocker.conflict(operations, tableBlockers)) { - throw new BlockerConflictException(operations + " is conflict with " + tableBlockers); - } - Optional maxBlockerOpt = - tableBlockers.stream() - .map(TableBlocker::getBlockerId) - .max(Comparator.comparingLong(l -> l)); - long prevBlockerId = maxBlockerOpt.orElse(-1L); - - TableBlocker tableBlocker = - TableBlocker.buildTableBlocker( - tableIdentifier, operations, properties, now, blockerTimeout, prevBlockerId); - try { - doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker)); - if (tableBlocker.getBlockerId() > 0) { - return tableBlocker.buildBlocker(); - } - } catch (PersistenceException e) { - LOG.warn("An exception occurs when creating a blocker:{}", tableBlocker, e); - } - } - throw new BlockerConflictException("Failed to create a blocker: conflict meet max retry"); - } - - @Override - public void releaseBlocker(TableIdentifier tableIdentifier, String blockerId) { - doAs(TableBlockerMapper.class, mapper -> mapper.deleteBlocker(Long.parseLong(blockerId))); - } - - @Override - public long renewBlocker(TableIdentifier tableIdentifier, String blockerId) { - int retry = 0; - while (retry++ < TABLE_BLOCKER_RETRY) { - long now = System.currentTimeMillis(); - long id = Long.parseLong(blockerId); - TableBlocker tableBlocker = - getAs(TableBlockerMapper.class, mapper -> mapper.selectBlocker(id, now)); - if (tableBlocker == null) { - throw new ObjectNotExistsException("Blocker " + blockerId + " of " + tableIdentifier); - } - long current = System.currentTimeMillis(); - long expirationTime = now + blockerTimeout; - long effectRow = - updateAs( - TableBlockerMapper.class, mapper -> mapper.renewBlocker(id, current, expirationTime)); - if (effectRow > 0) { - return expirationTime; - } - } - throw new BlockerConflictException("Failed to renew a blocker: conflict meet max retry"); - } - - @Override - public List getBlockers(TableIdentifier tableIdentifier) { - return getAs( - TableBlockerMapper.class, - mapper -> - mapper.selectBlockers( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName(), - System.currentTimeMillis())) - .stream() - .map(TableBlocker::buildBlocker) - .collect(Collectors.toList()); - } - - @Override - public Pair, Integer> getTableRuntimes( - String optimizerGroup, - @Nullable String fuzzyDbName, - @Nullable String fuzzyTableName, - @Nullable List statusCodeFilters, - int limit, - int offset) { - checkStarted(); - - // page helper is 1-based - int pageNumber = (offset / limit) + 1; - - try (Page ignore = PageHelper.startPage(pageNumber, limit, true)) { - int total = 0; - List ret = - getAs( - TableMetaMapper.class, - mapper -> - mapper.selectTableRuntimesForOptimizerGroup( - optimizerGroup, fuzzyDbName, fuzzyTableName, statusCodeFilters)); - PageInfo pageInfo = new PageInfo<>(ret); - total = (int) pageInfo.getTotal(); - return Pair.of(ret, total); - } - } - - @Override - public void addHandlerChain(RuntimeHandlerChain handler) { - checkNotStarted(); - if (headHandler == null) { - headHandler = handler; - } else { - headHandler.appendNext(handler); - } - } - - @Override - public void handleTableChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) { - if (headHandler != null) { - headHandler.fireStatusChanged(tableRuntime, originalStatus); - } - } - - @Override - public void handleTableChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { - if (headHandler != null) { - headHandler.fireConfigChanged(tableRuntime, originalConfig); - } - } - - @Override - public void initialize() { - checkNotStarted(); - - List tableRuntimeMetaList = - getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas); - List tableRuntimes = new ArrayList<>(tableRuntimeMetaList.size()); - tableRuntimeMetaList.forEach( - tableRuntimeMeta -> { - TableRuntime tableRuntime = new TableRuntime(tableRuntimeMeta, this); - tableRuntimeMap.put(tableRuntimeMeta.getTableId(), tableRuntime); - tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); - tableRuntimes.add(tableRuntime); - }); - - if (headHandler != null) { - headHandler.initialize(tableRuntimes); - } - if (tableExplorerExecutors == null) { - int threadCount = - serverConfiguration.getInteger( - AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_THREAD_COUNT); - int queueSize = - serverConfiguration.getInteger(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_QUEUE_SIZE); - tableExplorerExecutors = - new ThreadPoolExecutor( - threadCount, - threadCount, - 0, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(queueSize), - new ThreadFactoryBuilder() - .setNameFormat("table-explorer-executor-%d") - .setDaemon(true) - .build()); - } - tableExplorerScheduler.scheduleAtFixedRate( - this::exploreExternalCatalog, 0, externalCatalogRefreshingInterval, TimeUnit.MILLISECONDS); - initialized.complete(true); - } - - private TableRuntime getAndCheckExist(ServerTableIdentifier tableIdentifier) { - Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier cannot be null"); - TableRuntime tableRuntime = getRuntime(tableIdentifier.getId()); - if (tableRuntime == null) { - throw new ObjectNotExistsException(tableIdentifier); - } - return tableRuntime; - } - - @Override - public ServerTableIdentifier getServerTableIdentifier(TableIdentifier id) { - return getAs( - TableMetaMapper.class, - mapper -> - mapper.selectTableIdentifier(id.getCatalog(), id.getDatabase(), id.getTableName())); - } - - @Override - public TableRuntime getRuntime(Long tableId) { - checkStarted(); - return tableRuntimeMap.get(tableId); - } - - @Override - public boolean contains(Long tableId) { - checkStarted(); - return tableRuntimeMap.containsKey(tableId); - } - - public void dispose() { - tableExplorerScheduler.shutdown(); - if (tableExplorerExecutors != null) { - tableExplorerExecutors.shutdown(); - } - if (headHandler != null) { - headHandler.dispose(); - } - } - - @VisibleForTesting - void exploreExternalCatalog() { - if (!initialized.isDone()) { - throw new IllegalStateException("TableService is not initialized"); - } - long start = System.currentTimeMillis(); - List externalCatalogs = catalogManager.getExternalCatalogs(); - List externalCatalogNames = - externalCatalogs.stream().map(ExternalCatalog::name).collect(Collectors.toList()); - LOG.info("Syncing external catalogs: {}", String.join(",", externalCatalogNames)); - for (ExternalCatalog externalCatalog : externalCatalogs) { - try { - final List>> tableIdentifiersFutures = - Lists.newArrayList(); - externalCatalog - .listDatabases() - .forEach( - database -> { - try { - tableIdentifiersFutures.add( - CompletableFuture.supplyAsync( - () -> { - try { - return externalCatalog.listTables(database).stream() - .map(TableIdentity::new) - .collect(Collectors.toSet()); - } catch (Exception e) { - LOG.error( - "TableExplorer list tables in database {} error", database, e); - return new HashSet<>(); - } - }, - tableExplorerExecutors)); - } catch (RejectedExecutionException e) { - LOG.error( - "The queue of table explorer is full, please increase the queue size or thread count."); - } - }); - Set tableIdentifiers = - tableIdentifiersFutures.stream() - .map(CompletableFuture::join) - .reduce( - (a, b) -> { - a.addAll(b); - return a; - }) - .orElse(Sets.newHashSet()); - LOG.info( - "Loaded {} tables from external catalog {}.", - tableIdentifiers.size(), - externalCatalog.name()); - Map serverTableIdentifiers = - getAs( - TableMetaMapper.class, - mapper -> mapper.selectTableIdentifiersByCatalog(externalCatalog.name())) - .stream() - .collect(Collectors.toMap(TableIdentity::new, tableIdentifier -> tableIdentifier)); - LOG.info( - "Loaded {} tables from Amoro server catalog {}.", - serverTableIdentifiers.size(), - externalCatalog.name()); - final List> taskFutures = Lists.newArrayList(); - Sets.difference(tableIdentifiers, serverTableIdentifiers.keySet()) - .forEach( - tableIdentity -> { - try { - taskFutures.add( - CompletableFuture.runAsync( - () -> { - try { - syncTable(externalCatalog, tableIdentity); - } catch (Exception e) { - LOG.error( - "TableExplorer sync table {} error", - tableIdentity.toString(), - e); - } - }, - tableExplorerExecutors)); - } catch (RejectedExecutionException e) { - LOG.error( - "The queue of table explorer is full, please increase the queue size or thread count."); - } - }); - Sets.difference(serverTableIdentifiers.keySet(), tableIdentifiers) - .forEach( - tableIdentity -> { - try { - taskFutures.add( - CompletableFuture.runAsync( - () -> { - try { - disposeTable(serverTableIdentifiers.get(tableIdentity)); - } catch (Exception e) { - LOG.error( - "TableExplorer dispose table {} error", - tableIdentity.toString(), - e); - } - }, - tableExplorerExecutors)); - } catch (RejectedExecutionException e) { - LOG.error( - "The queue of table explorer is full, please increase the queue size or thread count."); - } - }); - taskFutures.forEach(CompletableFuture::join); - } catch (Throwable e) { - LOG.error("TableExplorer error", e); - } - } - - // Clear TableRuntime objects that do not correspond to a catalog. - // This scenario is mainly due to the fact that TableRuntime objects were not cleaned up in a - // timely manner during the process of dropping the catalog due to concurrency considerations. - // It is permissible to have some erroneous states in the middle, as long as the final data is - // consistent. - Set catalogNames = - catalogManager.listCatalogMetas().stream() - .map(CatalogMeta::getCatalogName) - .collect(Collectors.toSet()); - for (TableRuntime tableRuntime : tableRuntimeMap.values()) { - if (!catalogNames.contains(tableRuntime.getTableIdentifier().getCatalog())) { - disposeTable(tableRuntime.getTableIdentifier()); - } - } - - long end = System.currentTimeMillis(); - LOG.info("Syncing external catalogs took {} ms.", end - start); - } - - private void validateCatalogUpdate(CatalogMeta oldMeta, CatalogMeta newMeta) { - if (!oldMeta.getCatalogType().equals(newMeta.getCatalogType())) { - throw new IllegalMetadataException("Cannot update catalog type"); - } - } - - private void checkStarted() { - try { - initialized.get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private void checkNotStarted() { - if (initialized.isDone()) { - throw new IllegalStateException("Table service has started."); - } - } - - private void syncTable(ExternalCatalog externalCatalog, TableIdentity tableIdentity) { - AtomicBoolean tableRuntimeAdded = new AtomicBoolean(false); - try { - doAsTransaction( - () -> - externalCatalog.syncTable( - tableIdentity.getDatabase(), - tableIdentity.getTableName(), - tableIdentity.getFormat()), - () -> { - ServerTableIdentifier tableIdentifier = - externalCatalog.getServerTableIdentifier( - tableIdentity.getDatabase(), tableIdentity.getTableName()); - tableRuntimeAdded.set(triggerTableAdded(externalCatalog, tableIdentifier)); - }); - } catch (Throwable t) { - if (tableRuntimeAdded.get()) { - revertTableRuntimeAdded(externalCatalog, tableIdentity); - } - throw t; - } - } - - private boolean triggerTableAdded( - ServerCatalog catalog, ServerTableIdentifier serverTableIdentifier) { - AmoroTable table = - catalog.loadTable( - serverTableIdentifier.getDatabase(), serverTableIdentifier.getTableName()); - if (TableFormat.ICEBERG.equals(table.format())) { - if (TablePropertyUtil.isMixedTableStore(table.properties())) { - return false; - } - } - TableRuntime tableRuntime = new TableRuntime(serverTableIdentifier, this, table.properties()); - tableRuntimeMap.put(serverTableIdentifier.getId(), tableRuntime); - tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); - if (headHandler != null) { - headHandler.fireTableAdded(table, tableRuntime); - } - return true; - } - - private void revertTableRuntimeAdded( - ExternalCatalog externalCatalog, TableIdentity tableIdentity) { - ServerTableIdentifier tableIdentifier = - externalCatalog.getServerTableIdentifier( - tableIdentity.getDatabase(), tableIdentity.getTableName()); - if (tableIdentifier != null) { - tableRuntimeMap.remove(tableIdentifier.getId()); - } - } - - private void disposeTable(ServerTableIdentifier tableIdentifier) { - doAs( - TableMetaMapper.class, - mapper -> - mapper.deleteTableIdByName( - tableIdentifier.getCatalog(), - tableIdentifier.getDatabase(), - tableIdentifier.getTableName())); - Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier.getId())) - .ifPresent( - tableRuntime -> { - if (headHandler != null) { - headHandler.fireTableRemoved(tableRuntime); - } - tableRuntime.dispose(); - }); - } - - private static class TableIdentity { - - private final String database; - private final String tableName; - - private final TableFormat format; - - protected TableIdentity(TableIDWithFormat idWithFormat) { - this.database = idWithFormat.getIdentifier().getDatabase(); - this.tableName = idWithFormat.getIdentifier().getTableName(); - this.format = idWithFormat.getTableFormat(); - } - - protected TableIdentity(ServerTableIdentifier serverTableIdentifier) { - this.database = serverTableIdentifier.getDatabase(); - this.tableName = serverTableIdentifier.getTableName(); - this.format = serverTableIdentifier.getFormat(); - } - - protected TableIdentity(String database, String tableName, TableFormat format) { - this.database = database; - this.tableName = tableName; - this.format = format; - } - - public String getDatabase() { - return database; - } - - public String getTableName() { - return tableName; - } - - public TableFormat getFormat() { - return format; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TableIdentity that = (TableIdentity) o; - return Objects.equal(database, that.database) && Objects.equal(tableName, that.tableName); - } - - @Override - public int hashCode() { - return Objects.hashCode(database, tableName); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("database", database) - .add("tableName", tableName) - .toString(); - } - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/MaintainedTableManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/MaintainedTableManager.java index 3761a67a64..f4acbcfc28 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/MaintainedTableManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/MaintainedTableManager.java @@ -20,12 +20,15 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.api.TableIdentifier; +import org.apache.amoro.server.persistence.TableRuntimeMeta; +import org.apache.commons.lang3.tuple.Pair; + +import javax.annotation.Nullable; import java.util.List; public interface MaintainedTableManager { - /** * Load all managed tables. Managed tables means the tables which are managed by AMS, AMS will * watch their change and make them health. @@ -40,4 +43,27 @@ public interface MaintainedTableManager { * @return the {@link ServerTableIdentifier} instance */ ServerTableIdentifier getServerTableIdentifier(TableIdentifier id); + + /** + * Get the table info from database for given parameters. + * + * @param optimizerGroup The optimizer group of the table associated to. will be if we want the + * info for all groups. + * @param fuzzyDbName the fuzzy db name used to filter the result, will be null if no filter set. + * @param fuzzyTableName the fuzzy table name used to filter the result, will be null if no filter + * set. + * @param statusCodeFilters the status code used to filter the result, wil be null if no filter + * set. + * @param limit How many entries we want to retrieve. + * @param offset The entries we'll skip when retrieving the entries. + * @return A pair with the first entry is the actual list under the filters with the offset and + * limit, and second value will be the number of total entries under the filters. + */ + Pair, Integer> queryTableRuntimeMetas( + String optimizerGroup, + @Nullable String fuzzyDbName, + @Nullable String fuzzyTableName, + @Nullable List statusCodeFilters, + int limit, + int offset); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableManager.java index f262d0a67f..ba36d58915 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableManager.java @@ -21,7 +21,8 @@ import org.apache.amoro.server.table.blocker.TableBlockerManager; import org.apache.amoro.server.table.internal.InternalTableManager; -public interface TableManager extends MaintainedTableManager, InternalTableManager, TableBlockerManager { +public interface TableManager + extends MaintainedTableManager, InternalTableManager, TableBlockerManager { void setTableService(TableService service); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableMetadata.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableMetadata.java index 1c9ad427b6..690cc71776 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableMetadata.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableMetadata.java @@ -97,7 +97,7 @@ public TableMetadata( public TableMeta buildTableMeta() { TableMeta meta = new TableMeta(); - meta.setTableIdentifier(tableIdentifier.getIdentifier()); + meta.setTableIdentifier(tableIdentifier.getIdentifier().buildTableIdentifier()); Map locations = new HashMap<>(); PropertiesUtil.putNotNullProperties( locations, MetaTableProperties.LOCATION_KEY_TABLE, tableLocation); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java index 035f08bd0c..c1a265800d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java @@ -18,23 +18,31 @@ package org.apache.amoro.server.table; +import org.apache.amoro.AmoroTable; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.server.catalog.InternalCatalog; public interface TableService extends TableRuntimeHandler { void initialize(); + void dispose(); void onTableCreated(InternalCatalog catalog, ServerTableIdentifier identifier); void onTableDropped(InternalCatalog catalog, ServerTableIdentifier identifier); - TableRuntime getRuntime(Long tableId); - default boolean contains(Long tableId) { return getRuntime(tableId) != null; } + + /** + * load a table via server catalog. + * + * @param identifier managed table identifier + * @return managed table. + */ + AmoroTable loadTable(ServerTableIdentifier identifier); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java index 203b614815..29144b1dd2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java @@ -20,7 +20,7 @@ import org.apache.amoro.config.Configurations; import org.apache.amoro.server.AmoroManagementConf; -import org.apache.amoro.server.table.TableManagerOld; +import org.apache.amoro.server.table.TableService; public class AsyncTableExecutors { @@ -40,56 +40,56 @@ public static AsyncTableExecutors getInstance() { return instance; } - public void setup(TableManagerOld tableManager, Configurations conf) { + public void setup(TableService tableService, Configurations conf) { if (conf.getBoolean(AmoroManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) { this.snapshotsExpiringExecutor = new SnapshotsExpiringExecutor( - tableManager, conf.getInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT)); + tableService, conf.getInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT)); } if (conf.getBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) { this.orphanFilesCleaningExecutor = new OrphanFilesCleaningExecutor( - tableManager, + tableService, conf.getInteger(AmoroManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT), conf.get(AmoroManagementConf.CLEAN_ORPHAN_FILES_INTERVAL)); } if (conf.getBoolean(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_ENABLED)) { this.danglingDeleteFilesCleaningExecutor = new DanglingDeleteFilesCleaningExecutor( - tableManager, + tableService, conf.getInteger(AmoroManagementConf.CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT)); } this.optimizingCommitExecutor = new OptimizingCommitExecutor( - tableManager, conf.getInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT)); + tableService, conf.getInteger(AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT)); this.optimizingExpiringExecutor = new OptimizingExpiringExecutor( - tableManager, + tableService, conf.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_KEEP_DAYS), conf.getInteger(AmoroManagementConf.OPTIMIZING_RUNTIME_DATA_EXPIRE_INTERVAL_HOURS)); - this.blockerExpiringExecutor = new BlockerExpiringExecutor(tableManager); + this.blockerExpiringExecutor = new BlockerExpiringExecutor(tableService); if (conf.getBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED)) { this.hiveCommitSyncExecutor = new HiveCommitSyncExecutor( - tableManager, conf.getInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT)); + tableService, conf.getInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT)); } this.tableRefreshingExecutor = new TableRuntimeRefreshExecutor( - tableManager, + tableService, conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT), conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL), conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS)); if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) { this.tagsAutoCreatingExecutor = new TagsAutoCreatingExecutor( - tableManager, + tableService, conf.getInteger(AmoroManagementConf.AUTO_CREATE_TAGS_THREAD_COUNT), conf.getLong(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL)); } if (conf.getBoolean(AmoroManagementConf.DATA_EXPIRATION_ENABLED)) { this.dataExpiringExecutor = new DataExpiringExecutor( - tableManager, + tableService, conf.getInteger(AmoroManagementConf.DATA_EXPIRATION_THREAD_COUNT), conf.get(AmoroManagementConf.DATA_EXPIRATION_INTERVAL)); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java index 624edaf2a6..39f54a1778 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java @@ -23,8 +23,8 @@ import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.table.RuntimeHandlerChain; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -46,12 +46,12 @@ public abstract class BaseTableExecutor extends RuntimeHandlerChain { private static final long START_DELAY = 10 * 1000L; private final ScheduledExecutorService executor; - private final TableManagerOld tableManager; + private final TableService tableService; private final Set scheduledTables = Collections.synchronizedSet(new HashSet<>()); - protected BaseTableExecutor(TableManagerOld tableManager, int poolSize) { - this.tableManager = tableManager; + protected BaseTableExecutor(TableService tableService, int poolSize) { + this.tableService = tableService; this.executor = Executors.newScheduledThreadPool( poolSize, @@ -107,7 +107,7 @@ protected String getThreadName() { } private boolean isExecutable(TableRuntime tableRuntime) { - return tableManager.contains(tableRuntime.getTableIdentifier().getId()) + return tableService.contains(tableRuntime.getTableIdentifier().getId()) && enabled(tableRuntime); } @@ -140,6 +140,6 @@ protected long getStartDelay() { } protected AmoroTable loadTable(TableRuntime tableRuntime) { - return tableManager.loadTable(tableRuntime.getTableIdentifier()); + return tableService.loadTable(tableRuntime.getTableIdentifier()); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java index 3ff6aa1a15..b7c0a9259b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java @@ -20,8 +20,8 @@ import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; public class BlockerExpiringExecutor extends BaseTableExecutor { @@ -29,8 +29,8 @@ public class BlockerExpiringExecutor extends BaseTableExecutor { private static final long INTERVAL = 60 * 60 * 1000L; // 1 hour - public BlockerExpiringExecutor(TableManagerOld tableManager) { - super(tableManager, 1); + public BlockerExpiringExecutor(TableService tableService) { + super(tableService, 1); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/DanglingDeleteFilesCleaningExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/DanglingDeleteFilesCleaningExecutor.java index 388286f0c9..4050b4d7d9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/DanglingDeleteFilesCleaningExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/DanglingDeleteFilesCleaningExecutor.java @@ -21,8 +21,8 @@ import org.apache.amoro.AmoroTable; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.server.optimizing.maintainer.TableMaintainer; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +33,8 @@ public class DanglingDeleteFilesCleaningExecutor extends BaseTableExecutor { private static final long INTERVAL = 24 * 60 * 60 * 1000L; - protected DanglingDeleteFilesCleaningExecutor(TableManagerOld tableManager, int poolSize) { - super(tableManager, poolSize); + protected DanglingDeleteFilesCleaningExecutor(TableService tableService, int poolSize) { + super(tableService, poolSize); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/DataExpiringExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/DataExpiringExecutor.java index 48ece2c88b..e5b9b593dd 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/DataExpiringExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/DataExpiringExecutor.java @@ -21,8 +21,8 @@ import org.apache.amoro.AmoroTable; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.server.optimizing.maintainer.TableMaintainer; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +34,8 @@ public class DataExpiringExecutor extends BaseTableExecutor { private final Duration interval; - protected DataExpiringExecutor(TableManagerOld tableManager, int poolSize, Duration interval) { - super(tableManager, poolSize); + protected DataExpiringExecutor(TableService tableService, int poolSize, Duration interval) { + super(tableService, poolSize); this.interval = interval; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/HiveCommitSyncExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/HiveCommitSyncExecutor.java index 0be24006f5..14c109d1d6 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/HiveCommitSyncExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/HiveCommitSyncExecutor.java @@ -22,8 +22,8 @@ import org.apache.amoro.hive.table.SupportHive; import org.apache.amoro.hive.utils.HiveMetaSynchronizer; import org.apache.amoro.hive.utils.TableTypeUtil; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; import org.apache.amoro.table.MixedTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +34,8 @@ public class HiveCommitSyncExecutor extends BaseTableExecutor { // 10 minutes private static final long INTERVAL = 10 * 60 * 1000L; - public HiveCommitSyncExecutor(TableManagerOld tableManager, int poolSize) { - super(tableManager, poolSize); + public HiveCommitSyncExecutor(TableService tableService, int poolSize) { + super(tableService, poolSize); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OptimizingCommitExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OptimizingCommitExecutor.java index 7b2553c1e8..b1d07594e5 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OptimizingCommitExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OptimizingCommitExecutor.java @@ -20,8 +20,8 @@ import org.apache.amoro.AmoroTable; import org.apache.amoro.server.optimizing.OptimizingStatus; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; import java.util.Optional; @@ -29,8 +29,8 @@ public class OptimizingCommitExecutor extends BaseTableExecutor { private static final long INTERVAL = 60 * 1000L; // 1min - public OptimizingCommitExecutor(TableManagerOld tableManager, int poolSize) { - super(tableManager, poolSize); + public OptimizingCommitExecutor(TableService tableService, int poolSize) { + super(tableService, poolSize); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OptimizingExpiringExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OptimizingExpiringExecutor.java index 36642790c7..c7230a8316 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OptimizingExpiringExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OptimizingExpiringExecutor.java @@ -20,8 +20,8 @@ import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.mapper.OptimizingMapper; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +32,8 @@ public class OptimizingExpiringExecutor extends BaseTableExecutor { private final long keepTime; private final long interval; - public OptimizingExpiringExecutor(TableManagerOld tableManager, int keepDays, int intervalHours) { - super(tableManager, 1); + public OptimizingExpiringExecutor(TableService tableService, int keepDays, int intervalHours) { + super(tableService, 1); this.keepTime = keepDays * 24 * 60 * 60 * 1000L; this.interval = intervalHours * 60 * 60 * 1000L; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OrphanFilesCleaningExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OrphanFilesCleaningExecutor.java index c310fda5f8..62d7abc624 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OrphanFilesCleaningExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/OrphanFilesCleaningExecutor.java @@ -21,8 +21,8 @@ import org.apache.amoro.AmoroTable; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.server.optimizing.maintainer.TableMaintainer; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +32,8 @@ public class OrphanFilesCleaningExecutor extends BaseTableExecutor { private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesCleaningExecutor.class); private final Duration interval; - public OrphanFilesCleaningExecutor(TableManagerOld tableManager, int poolSize, Duration interval) { - super(tableManager, poolSize); + public OrphanFilesCleaningExecutor(TableService tableService, int poolSize, Duration interval) { + super(tableService, poolSize); this.interval = interval; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/SnapshotsExpiringExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/SnapshotsExpiringExecutor.java index 77ce0ec862..ea39274d04 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/SnapshotsExpiringExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/SnapshotsExpiringExecutor.java @@ -21,8 +21,8 @@ import org.apache.amoro.AmoroTable; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.server.optimizing.maintainer.TableMaintainer; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +32,8 @@ public class SnapshotsExpiringExecutor extends BaseTableExecutor { private static final long INTERVAL = 60 * 60 * 1000L; // 1 hour - public SnapshotsExpiringExecutor(TableManagerOld tableManager, int poolSize) { - super(tableManager, poolSize); + public SnapshotsExpiringExecutor(TableService tableService, int poolSize) { + super(tableService, poolSize); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java index 4be9020f49..85cd5b5f90 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java @@ -23,8 +23,8 @@ import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator; import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.server.optimizing.OptimizingProcess; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.table.MixedTable; @@ -36,8 +36,8 @@ public class TableRuntimeRefreshExecutor extends BaseTableExecutor { private final int maxPendingPartitions; public TableRuntimeRefreshExecutor( - TableManagerOld tableManager, int poolSize, long interval, int maxPendingPartitions) { - super(tableManager, poolSize); + TableService tableService, int poolSize, long interval, int maxPendingPartitions) { + super(tableService, poolSize); this.interval = interval; this.maxPendingPartitions = maxPendingPartitions; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TagsAutoCreatingExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TagsAutoCreatingExecutor.java index d4bc697fb8..c549b93619 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TagsAutoCreatingExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TagsAutoCreatingExecutor.java @@ -22,8 +22,8 @@ import org.apache.amoro.TableFormat; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.server.optimizing.maintainer.TableMaintainer; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +33,8 @@ public class TagsAutoCreatingExecutor extends BaseTableExecutor { private final long interval; - protected TagsAutoCreatingExecutor(TableManagerOld tableManager, int poolSize, long interval) { - super(tableManager, poolSize); + protected TagsAutoCreatingExecutor(TableService tableService, int poolSize, long interval) { + super(tableService, poolSize); this.interval = interval; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/internal/InternalIcebergCreator.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/internal/InternalIcebergCreator.java index b2fb99e61e..90255a30f8 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/internal/InternalIcebergCreator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/internal/InternalIcebergCreator.java @@ -84,7 +84,7 @@ public TableMetadata create() { ServerTableIdentifier serverTableIdentifier = ServerTableIdentifier.of(catalogMeta.getCatalogName(), database, tableName, format()); - meta.setTableIdentifier(serverTableIdentifier.getIdentifier()); + meta.setTableIdentifier(serverTableIdentifier.getIdentifier().buildTableIdentifier()); // write metadata file. OutputFile outputFile = io.newOutputFile(icebergMetadataFileLocation); this.metadataFileLocation = icebergMetadataFileLocation; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/internal/InternalTableCreator.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/internal/InternalTableCreator.java index bb61a479a0..a91b02d55a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/internal/InternalTableCreator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/internal/InternalTableCreator.java @@ -19,7 +19,6 @@ package org.apache.amoro.server.table.internal; import org.apache.amoro.server.table.TableMetadata; -import org.apache.amoro.server.table.TableServiceOld; import java.io.Closeable; @@ -28,7 +27,7 @@ public interface InternalTableCreator extends Closeable { /** * Do all things about create an internal table, and prepare the {@link TableMetadata} for {@link - * TableServiceOld#createTable(java.lang.String, TableMetadata)} + * InternalTableManager#createTable(java.lang.String, TableMetadata)} */ TableMetadata create(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java index 5728453c2a..f1feab8f57 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java @@ -31,7 +31,6 @@ import org.apache.amoro.server.dashboard.model.LogInfo; import org.apache.amoro.server.dashboard.model.SqlResult; import org.apache.amoro.server.dashboard.utils.AmsUtil; -import org.apache.amoro.server.table.TableServiceOld; import org.apache.amoro.server.terminal.kyuubi.KyuubiTerminalSessionFactory; import org.apache.amoro.server.terminal.local.LocalSessionFactory; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -81,8 +80,7 @@ public class TerminalManager { new LinkedBlockingQueue<>(), r -> new Thread(null, r, "terminal-execute-" + threadPoolCount.incrementAndGet())); - public TerminalManager( - Configurations conf, CatalogManager catalogManager) { + public TerminalManager(Configurations conf, CatalogManager catalogManager) { this.serviceConfig = conf; this.catalogManager = catalogManager; this.resultLimits = conf.getInteger(AmoroManagementConf.TERMINAL_RESULT_LIMIT); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java new file mode 100644 index 0000000000..de383962e5 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSManagerTestBase.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server; + +import org.apache.amoro.config.Configurations; +import org.apache.amoro.resource.ResourceGroup; +import org.apache.amoro.server.catalog.DefaultCatalogManager; +import org.apache.amoro.server.manager.EventsManager; +import org.apache.amoro.server.manager.MetricManager; +import org.apache.amoro.server.table.DefaultTableManager; +import org.apache.amoro.server.table.DerbyPersistence; +import org.apache.amoro.server.table.TableManager; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; + +public abstract class AMSManagerTestBase { + + @ClassRule public static DerbyPersistence DERBY = new DerbyPersistence(); + + protected static DefaultCatalogManager CATALOG_MANAGER = null; + protected static DefaultTableManager TABLE_MANAGER = null; + + @BeforeClass + public static void initTableService() { + try { + Configurations configurations = new Configurations(); + CATALOG_MANAGER = new DefaultCatalogManager(configurations); + TABLE_MANAGER = new DefaultTableManager(configurations, CATALOG_MANAGER); + } catch (Throwable throwable) { + Assert.fail(throwable.getMessage()); + } + } + + @AfterClass + public static void disposeTableService() { + MetricManager.dispose(); + EventsManager.dispose(); + } + + protected TableManager tableManager() { + return TABLE_MANAGER; + } + + protected static ResourceGroup defaultResourceGroup() { + return new ResourceGroup.Builder("default", "local").build(); + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TableServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java similarity index 73% rename from amoro-ams/src/test/java/org/apache/amoro/server/table/TableServiceTestBase.java rename to amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index 956e71b925..7c9c574f18 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TableServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -16,26 +16,19 @@ * limitations under the License. */ -package org.apache.amoro.server.table; +package org.apache.amoro.server; import org.apache.amoro.config.Configurations; import org.apache.amoro.resource.ResourceGroup; -import org.apache.amoro.server.AmoroManagementConf; -import org.apache.amoro.server.DefaultOptimizingService; -import org.apache.amoro.server.catalog.DefaultCatalogManager; import org.apache.amoro.server.manager.EventsManager; import org.apache.amoro.server.manager.MetricManager; +import org.apache.amoro.server.table.DefaultTableService; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.ClassRule; -public abstract class TableServiceTestBase { - - @ClassRule public static DerbyPersistence DERBY = new DerbyPersistence(); - - protected static DefaultCatalogManager CATALOG_MANAGER = null; - private static DefaultTableServiceOld TABLE_SERVICE = null; +public abstract class AMSServiceTestBase extends AMSManagerTestBase { + private static DefaultTableService TABLE_SERVICE = null; private static DefaultOptimizingService OPTIMIZING_SERVICE = null; @BeforeClass @@ -43,10 +36,10 @@ public static void initTableService() { try { Configurations configurations = new Configurations(); configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, 800L); - CATALOG_MANAGER = new DefaultCatalogManager(configurations); - TABLE_SERVICE = new DefaultTableServiceOld(new Configurations(), CATALOG_MANAGER); + TABLE_SERVICE = new DefaultTableService(new Configurations(), CATALOG_MANAGER); OPTIMIZING_SERVICE = - new DefaultOptimizingService(configurations, CATALOG_MANAGER, TABLE_SERVICE); + new DefaultOptimizingService( + configurations, CATALOG_MANAGER, TABLE_MANAGER, TABLE_SERVICE); TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler()); TABLE_SERVICE.initialize(); try { @@ -65,7 +58,7 @@ public static void disposeTableService() { EventsManager.dispose(); } - protected DefaultTableServiceOld tableService() { + protected DefaultTableService tableService() { return TABLE_SERVICE; } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java index f38b7a5049..b64c9db86d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java @@ -33,7 +33,7 @@ import org.apache.amoro.server.catalog.ServerCatalog; import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.resource.ResourceContainers; -import org.apache.amoro.server.table.DefaultTableServiceOld; +import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.io.MoreFiles; import org.apache.amoro.shade.guava32.com.google.common.io.RecursiveDeleteOption; @@ -69,7 +69,6 @@ public class AmsEnvironment { private final AmoroServiceContainer serviceContainer; private Configurations serviceConfig; private DefaultCatalogManager catalogManager; - private DefaultTableServiceOld tableService; private final AtomicBoolean amsExit; private int tableServiceBindPort; private int optimizingServiceBindPort; @@ -139,14 +138,14 @@ public void start() throws Exception { testHMS.start(); startAms(); - DynFields.UnboundField amsTableServiceField = + DynFields.UnboundField amsTableServiceField = DynFields.builder().hiddenImpl(AmoroServiceContainer.class, "tableService").build(); DynFields.UnboundField amsCatalogManagerField = DynFields.builder().hiddenImpl(AmoroServiceContainer.class, "catalogManager").build(); catalogManager = amsCatalogManagerField.bind(serviceContainer).get(); - tableService = amsTableServiceField.bind(serviceContainer).get(); - DynFields.UnboundField> tableServiceField = - DynFields.builder().hiddenImpl(DefaultTableServiceOld.class, "initialized").build(); + DefaultTableService tableService = amsTableServiceField.bind(serviceContainer).get(); + DynFields.UnboundField> tableServiceInitializedField = + DynFields.builder().hiddenImpl(DefaultTableService.class, "initialized").build(); boolean tableServiceIsStart = false; long startTime = System.currentTimeMillis(); while (!tableServiceIsStart) { @@ -154,7 +153,7 @@ public void start() throws Exception { throw new RuntimeException("table service not start yet after 10s"); } try { - tableServiceField.bind(tableService).get().get(); + tableServiceInitializedField.bind(tableService).get().get(); tableServiceIsStart = true; } catch (RuntimeException e) { LOG.info("table service not start yet"); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java index ccae9257ee..fe798dc787 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java @@ -28,7 +28,6 @@ import org.apache.amoro.server.table.TableMetadata; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.table.TableService; -import org.apache.amoro.server.table.TableServiceOld; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.table.PrimaryKeySpec; import org.apache.amoro.table.TableIdentifier; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 4ea6cdd574..9f17b8a4ce 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -46,8 +46,8 @@ import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.table.AMSTableTestBase; +import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableServiceOld; import org.apache.amoro.server.table.executor.TableRuntimeRefreshExecutor; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; @@ -358,7 +358,10 @@ public void testReloadFailedTask() { assertTaskStatus(TaskRuntime.Status.PLANNED); } - /** Test the logic for {@link TableServiceOld#getTableRuntimes}. */ + /** + * Test the logic for {@link TableManager#queryTableRuntimeMetas(String, String, String, List, + * int, int)}. + */ @Test public void testGetRuntimes() { String catalog = "catalog"; @@ -556,15 +559,16 @@ public void testGetRuntimes() { // 2 test and assert the result // 2.1 only optimize group filter set Pair, Integer> res = - tableService() - .getTableRuntimes(optimizerGroup1, null, null, Collections.emptyList(), 10, 0); + tableManager() + .queryTableRuntimeMetas(optimizerGroup1, null, null, Collections.emptyList(), 10, 0); Integer expectedTotalinGroup1 = 14; Assert.assertEquals(expectedTotalinGroup1, res.getRight()); Assert.assertEquals(10, res.getLeft().size()); // 2.2 set optimize group and db filter res = - tableService().getTableRuntimes(optimizerGroup1, db1, null, Collections.emptyList(), 5, 0); + tableManager() + .queryTableRuntimeMetas(optimizerGroup1, db1, null, Collections.emptyList(), 5, 0); // there are 8 tables in db1 in optimizerGroup1 Integer expectedTotalGroup1Db1 = 8; Assert.assertEquals(expectedTotalGroup1Db1, res.getRight()); @@ -574,13 +578,15 @@ public void testGetRuntimes() { // there are 3 tables with suffix "-InOtherGroup" in opGroup2 String fuzzyDbName = "InOtherGroup"; res = - tableService().getTableRuntimes(opGroup2, null, fuzzyDbName, Collections.emptyList(), 2, 0); + tableManager() + .queryTableRuntimeMetas(opGroup2, null, fuzzyDbName, Collections.emptyList(), 2, 0); Integer expectedTotalWithFuzzyDbName = 3; Assert.assertEquals(expectedTotalWithFuzzyDbName, res.getRight()); Assert.assertEquals(2, res.getLeft().size()); res = - tableService().getTableRuntimes(opGroup2, null, fuzzyDbName, Collections.emptyList(), 5, 0); + tableManager() + .queryTableRuntimeMetas(opGroup2, null, fuzzyDbName, Collections.emptyList(), 5, 0); Assert.assertEquals(expectedTotalWithFuzzyDbName, res.getRight()); // there are only 3 tables with the suffix in opGroup2 Assert.assertEquals(3, res.getLeft().size()); @@ -588,7 +594,7 @@ public void testGetRuntimes() { // 2.4 set optimize group and status filter, with only one status List statusCode = new ArrayList<>(); statusCode.add(OptimizingStatus.MAJOR_OPTIMIZING.getCode()); - res = tableService().getTableRuntimes(optimizerGroup1, null, null, statusCode, 10, 0); + res = tableManager().queryTableRuntimeMetas(optimizerGroup1, null, null, statusCode, 10, 0); Integer expectedTotalInGroup1WithMajorStatus = 2; Assert.assertEquals(expectedTotalInGroup1WithMajorStatus, res.getRight()); Assert.assertEquals(2, res.getLeft().size()); @@ -597,7 +603,7 @@ public void testGetRuntimes() { statusCode.clear(); statusCode.add(OptimizingStatus.MINOR_OPTIMIZING.getCode()); statusCode.add(OptimizingStatus.MAJOR_OPTIMIZING.getCode()); - res = tableService().getTableRuntimes(optimizerGroup1, null, null, statusCode, 3, 0); + res = tableManager().queryTableRuntimeMetas(optimizerGroup1, null, null, statusCode, 3, 0); Integer expectedTotalInGroup1WithMinorMajorStatus = 4; Assert.assertEquals(expectedTotalInGroup1WithMinorMajorStatus, res.getRight()); Assert.assertEquals(3, res.getLeft().size()); @@ -607,7 +613,8 @@ public void testGetRuntimes() { statusCode.add(OptimizingStatus.PENDING.getCode()); statusCode.add(OptimizingStatus.FULL_OPTIMIZING.getCode()); String tableFilter = "pending"; - res = tableService().getTableRuntimes(optimizerGroup1, db1, tableFilter, statusCode, 10, 0); + res = + tableManager().queryTableRuntimeMetas(optimizerGroup1, db1, tableFilter, statusCode, 10, 0); Integer expectedTotalInGroup1InDb1WithTableFilterAndStatus = 2; Assert.assertEquals(expectedTotalInGroup1InDb1WithTableFilterAndStatus, res.getRight()); Assert.assertEquals(2, res.getLeft().size()); @@ -618,7 +625,8 @@ public void testGetRuntimes() { statusCode.add(OptimizingStatus.FULL_OPTIMIZING.getCode()); String wrongTableFilter2 = "noTableWithName"; res = - tableService().getTableRuntimes(optimizerGroup1, db1, wrongTableFilter2, statusCode, 10, 0); + tableManager() + .queryTableRuntimeMetas(optimizerGroup1, db1, wrongTableFilter2, statusCode, 10, 0); Assert.assertEquals(0, (int) res.getRight()); Assert.assertTrue(res.getLeft().isEmpty()); } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TableCatalogTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TableCatalogTestBase.java index a50d911f3a..e7c8cd90fd 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TableCatalogTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TableCatalogTestBase.java @@ -21,7 +21,7 @@ import org.apache.amoro.AmoroCatalog; import org.apache.amoro.formats.AmoroCatalogTestHelper; import org.apache.amoro.hive.TestHMS; -import org.apache.amoro.server.table.TableServiceTestBase; +import org.apache.amoro.server.AMSServiceTestBase; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -30,7 +30,7 @@ import java.io.IOException; -public class TableCatalogTestBase extends TableServiceTestBase { +public class TableCatalogTestBase extends AMSServiceTestBase { @Rule public TemporaryFolder temp = new TemporaryFolder(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java index eeeba78c3f..84ebcd3860 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java @@ -108,7 +108,7 @@ protected static ResourceGroup testResourceGroup() { protected OptimizingQueue buildOptimizingGroupService(TableRuntime tableRuntime) { return new OptimizingQueue( - tableService(), + CATALOG_MANAGER, testResourceGroup(), quotaProvider, planExecutor, @@ -118,7 +118,7 @@ protected OptimizingQueue buildOptimizingGroupService(TableRuntime tableRuntime) private OptimizingQueue buildOptimizingGroupService() { return new OptimizingQueue( - tableService(), + CATALOG_MANAGER, testResourceGroup(), quotaProvider, planExecutor, diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java index 2b13018f16..55fe01c239 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java @@ -32,6 +32,7 @@ import org.apache.amoro.mixed.CatalogLoader; import org.apache.amoro.mixed.MixedFormatCatalog; import org.apache.amoro.properties.CatalogMetaProperties; +import org.apache.amoro.server.AMSServiceTestBase; import org.apache.amoro.server.catalog.InternalCatalog; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.table.MixedTable; @@ -51,7 +52,7 @@ import java.io.IOException; import java.util.List; -public class AMSTableTestBase extends TableServiceTestBase { +public class AMSTableTestBase extends AMSServiceTestBase { @ClassRule public static TestHMS TEST_HMS = new TestHMS(); @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -176,7 +177,7 @@ protected void createTable() { tableTestHelper.primaryKeySpec(), tableTestHelper.partitionSpec()); TableMetadata tableMetadata = tableMetadata(); - tableService().createTable(catalogMeta.getCatalogName(), tableMetadata); + tableManager().createTable(catalogMeta.getCatalogName(), tableMetadata); } else { if (catalogTestHelper.tableFormat().equals(TableFormat.ICEBERG)) { createIcebergTable(); @@ -190,7 +191,7 @@ protected void createTable() { tableService().exploreExternalCatalog(); } - serverTableIdentifier = tableService().listManagedTables().get(0); + serverTableIdentifier = tableManager().listManagedTables().get(0); } private void createMixedHiveTable() { @@ -239,7 +240,7 @@ private void createIcebergTable() { protected void dropTable() { if (externalCatalog == null) { mixedTables.dropTableByMeta(tableMeta, true); - tableService().dropTableMetadata(tableMeta.getTableIdentifier(), true); + tableManager().dropTableMetadata(tableMeta.getTableIdentifier(), true); } else { String database = tableTestHelper.id().getDatabase(); String table = tableTestHelper.id().getTableName(); @@ -266,7 +267,11 @@ protected CatalogMeta catalogMeta() { protected TableMetadata tableMetadata() { return new TableMetadata( - ServerTableIdentifier.of(tableMeta.getTableIdentifier(), catalogTestHelper.tableFormat()), + ServerTableIdentifier.of( + tableMeta.getTableIdentifier().getCatalog(), + tableMeta.getTableIdentifier().getDatabase(), + tableMeta.getTableIdentifier().getTableName(), + catalogTestHelper.tableFormat()), tableMeta, catalogMeta); } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java index 48362f65d0..95be73d409 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java @@ -28,6 +28,7 @@ import org.apache.amoro.hive.TestHMS; import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.properties.CatalogMetaProperties; +import org.apache.amoro.server.AMSServiceTestBase; import org.apache.amoro.server.catalog.InternalCatalog; import org.junit.Assert; import org.junit.Assume; @@ -39,7 +40,7 @@ import java.util.List; @RunWith(Parameterized.class) -public class TestCatalogService extends TableServiceTestBase { +public class TestCatalogService extends AMSServiceTestBase { @ClassRule public static TestHMS TEST_HMS = new TestHMS(); private final CatalogTestHelper catalogTestHelper; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java similarity index 85% rename from amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java rename to amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java index b20e8f6f23..2396d0048e 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java @@ -52,7 +52,7 @@ import java.util.stream.Collectors; @RunWith(Parameterized.class) -public class TestTableService extends AMSTableTestBase { +public class TestTableManager extends AMSTableTestBase { @Parameterized.Parameters(name = "{0}, {1}") public static Object[] parameters() { @@ -68,7 +68,7 @@ public static Object[] parameters() { }; } - public TestTableService(CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { + public TestTableManager(CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { super(catalogTestHelper, tableTestHelper); } @@ -122,7 +122,7 @@ public void testCreateAndDropTable() { if (catalogTestHelper().isInternalCatalog()) { Assert.assertThrows( AlreadyExistsException.class, - () -> tableService().createTable(TEST_CATALOG_NAME, tableMetadata())); + () -> tableManager().createTable(TEST_CATALOG_NAME, tableMetadata())); } // test create table with wrong catalog name @@ -132,7 +132,7 @@ public void testCreateAndDropTable() { TableMetadata copyMetadata = new TableMetadata(serverTableIdentifier(), tableMeta(), catalogMeta()); copyMetadata.getTableIdentifier().setCatalog("unknown"); - tableService().createTable("unknown", copyMetadata); + tableManager().createTable("unknown", copyMetadata); }); // test create table in not existed catalog @@ -142,7 +142,7 @@ public void testCreateAndDropTable() { TableMetadata copyMetadata = new TableMetadata(serverTableIdentifier(), tableMeta(), catalogMeta()); copyMetadata.getTableIdentifier().setCatalog("unknown"); - tableService().createTable("unknown", copyMetadata); + tableManager().createTable("unknown", copyMetadata); }); if (catalogTestHelper().tableFormat().equals(TableFormat.MIXED_ICEBERG)) { @@ -153,13 +153,13 @@ public void testCreateAndDropTable() { TableMetadata copyMetadata = new TableMetadata(serverTableIdentifier(), tableMeta(), catalogMeta()); copyMetadata.getTableIdentifier().setDatabase("unknown"); - tableService().createTable("unknown", copyMetadata); + tableManager().createTable("unknown", copyMetadata); }); } // test drop table dropTable(); - Assert.assertEquals(0, tableService().listManagedTables().size()); + Assert.assertEquals(0, tableManager().listManagedTables().size()); Assert.assertEquals(0, serverCatalog.listTables(TEST_DB_NAME).size()); Assert.assertEquals(0, serverCatalog.listTables().size()); if (catalogTestHelper().isInternalCatalog()) { @@ -171,7 +171,7 @@ public void testCreateAndDropTable() { // test drop not existed table Assert.assertThrows( ObjectNotExistsException.class, - () -> tableService().dropTableMetadata(tableMeta().getTableIdentifier(), true)); + () -> tableManager().dropTableMetadata(tableMeta().getTableIdentifier(), true)); dropDatabase(); } @@ -180,7 +180,8 @@ public void testCreateAndDropTable() { public void testBlockAndRelease() { createDatabase(); createTable(); - TableIdentifier tableIdentifier = serverTableIdentifier().getIdentifier(); + TableIdentifier tableIdentifier = + serverTableIdentifier().getIdentifier().buildTableIdentifier(); List operations = new ArrayList<>(); operations.add(BlockableOperation.BATCH_WRITE); @@ -190,13 +191,13 @@ public void testBlockAndRelease() { assertNotBlocked(BlockableOperation.OPTIMIZE); assertNotBlocked(BlockableOperation.BATCH_WRITE); - Blocker block = tableService().block(tableIdentifier, operations, getProperties()); + Blocker block = tableManager().block(tableIdentifier, operations, getProperties()); assertBlocker(block, operations); assertBlockerCnt(1); assertBlocked(BlockableOperation.OPTIMIZE); assertBlocked(BlockableOperation.BATCH_WRITE); - tableService().releaseBlocker(tableIdentifier, block.getBlockerId()); + tableManager().releaseBlocker(tableIdentifier, block.getBlockerId()); assertBlockerCnt(0); assertNotBlocked(BlockableOperation.OPTIMIZE); assertNotBlocked(BlockableOperation.BATCH_WRITE); @@ -209,7 +210,8 @@ public void testBlockAndRelease() { public void testBlockConflict() { createDatabase(); createTable(); - TableIdentifier tableIdentifier = serverTableIdentifier().getIdentifier(); + TableIdentifier tableIdentifier = + serverTableIdentifier().getIdentifier().buildTableIdentifier(); List operations = new ArrayList<>(); operations.add(BlockableOperation.BATCH_WRITE); @@ -219,19 +221,19 @@ public void testBlockConflict() { assertNotBlocked(BlockableOperation.OPTIMIZE); assertNotBlocked(BlockableOperation.BATCH_WRITE); - Blocker block = tableService().block(tableIdentifier, operations, getProperties()); + Blocker block = tableManager().block(tableIdentifier, operations, getProperties()); Assert.assertThrows( "should be conflict", BlockerConflictException.class, - () -> tableService().block(tableIdentifier, operations, getProperties())); + () -> tableManager().block(tableIdentifier, operations, getProperties())); assertBlocker(block, operations); assertBlockerCnt(1); assertBlocked(BlockableOperation.OPTIMIZE); assertBlocked(BlockableOperation.BATCH_WRITE); - tableService().releaseBlocker(tableIdentifier, block.getBlockerId()); + tableManager().releaseBlocker(tableIdentifier, block.getBlockerId()); assertBlockerCnt(0); assertNotBlocked(BlockableOperation.OPTIMIZE); assertNotBlocked(BlockableOperation.BATCH_WRITE); @@ -244,7 +246,8 @@ public void testBlockConflict() { public void testRenewBlocker() throws InterruptedException { createDatabase(); createTable(); - TableIdentifier tableIdentifier = serverTableIdentifier().getIdentifier(); + TableIdentifier tableIdentifier = + serverTableIdentifier().getIdentifier().buildTableIdentifier(); List operations = new ArrayList<>(); operations.add(BlockableOperation.BATCH_WRITE); @@ -254,10 +257,10 @@ public void testRenewBlocker() throws InterruptedException { assertNotBlocked(BlockableOperation.OPTIMIZE); assertNotBlocked(BlockableOperation.BATCH_WRITE); - Blocker block = tableService().block(tableIdentifier, operations, getProperties()); + Blocker block = tableManager().block(tableIdentifier, operations, getProperties()); Thread.sleep(1); - tableService().renewBlocker(tableIdentifier, block.getBlockerId()); + tableManager().renewBlocker(tableIdentifier, block.getBlockerId()); assertBlockerCnt(1); assertBlocked(BlockableOperation.OPTIMIZE); assertBlocked(BlockableOperation.BATCH_WRITE); @@ -266,9 +269,9 @@ public void testRenewBlocker() throws InterruptedException { assertBlockerCnt(1); assertBlocked(BlockableOperation.OPTIMIZE); assertBlocked(BlockableOperation.BATCH_WRITE); - assertBlockerRenewed(tableService().getBlockers(tableIdentifier).get(0)); + assertBlockerRenewed(tableManager().getBlockers(tableIdentifier).get(0)); - tableService().releaseBlocker(tableIdentifier, block.getBlockerId()); + tableManager().releaseBlocker(tableIdentifier, block.getBlockerId()); assertBlockerCnt(0); assertNotBlocked(BlockableOperation.OPTIMIZE); assertNotBlocked(BlockableOperation.BATCH_WRITE); @@ -281,22 +284,23 @@ public void testRenewBlocker() throws InterruptedException { public void testAutoIncrementBlockerId() { createDatabase(); createTable(); - TableIdentifier tableIdentifier = serverTableIdentifier().getIdentifier(); + TableIdentifier tableIdentifier = + serverTableIdentifier().getIdentifier().buildTableIdentifier(); List operations = new ArrayList<>(); operations.add(BlockableOperation.BATCH_WRITE); operations.add(BlockableOperation.OPTIMIZE); - Blocker block = tableService().block(tableIdentifier, operations, getProperties()); + Blocker block = tableManager().block(tableIdentifier, operations, getProperties()); - tableService().releaseBlocker(tableIdentifier, block.getBlockerId()); + tableManager().releaseBlocker(tableIdentifier, block.getBlockerId()); - Blocker block2 = tableService().block(tableIdentifier, operations, getProperties()); + Blocker block2 = tableManager().block(tableIdentifier, operations, getProperties()); Assert.assertEquals( Long.parseLong(block2.getBlockerId()) - Long.parseLong(block.getBlockerId()), 1); - tableService().releaseBlocker(tableIdentifier, block2.getBlockerId()); + tableManager().releaseBlocker(tableIdentifier, block2.getBlockerId()); dropTable(); dropDatabase(); @@ -336,7 +340,8 @@ private void assertBlocked(BlockableOperation operation) { } private boolean isBlocked(BlockableOperation operation) { - return tableService().getBlockers(serverTableIdentifier().getIdentifier()).stream() + return tableManager() + .getBlockers(serverTableIdentifier().getIdentifier().buildTableIdentifier()).stream() .anyMatch(blocker -> blocker.getOperations().contains(operation)); } @@ -346,7 +351,8 @@ private boolean isTableRuntimeBlocked(BlockableOperation operation) { private void assertBlockerCnt(int i) { List blockers; - blockers = tableService().getBlockers(serverTableIdentifier().getIdentifier()); + blockers = + tableManager().getBlockers(serverTableIdentifier().getIdentifier().buildTableIdentifier()); Assert.assertEquals(i, blockers.size()); } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java index 2a94d2e6e2..230bd59e1b 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java @@ -46,7 +46,7 @@ @RunWith(Parameterized.class) public class TestTableRuntimeHandler extends AMSTableTestBase { - private DefaultTableServiceOld tableService; + private DefaultTableService tableService; @Parameterized.Parameters(name = "{0}, {1}") public static Object[] parameters() { @@ -67,7 +67,7 @@ public TestTableRuntimeHandler( @Test public void testInitialize() throws Exception { - tableService = new DefaultTableServiceOld(new Configurations(), CATALOG_MANAGER); + tableService = new DefaultTableService(new Configurations(), CATALOG_MANAGER); TestHandler handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); @@ -76,7 +76,7 @@ public void testInitialize() throws Exception { createDatabase(); } createTable(); - ServerTableIdentifier createTableId = tableService().listManagedTables().get(0); + ServerTableIdentifier createTableId = tableManager().listManagedTables().get(0); Assert.assertEquals(1, handler.getAddedTables().size()); validateMixedTable(handler.getAddedTables().get(0).first()); validateTableRuntime(handler.getAddedTables().get(0).second()); @@ -86,7 +86,7 @@ public void testInitialize() throws Exception { Assert.assertTrue(handler.isDisposed()); // initialize with a history table - tableService = new DefaultTableServiceOld(new Configurations(), CATALOG_MANAGER); + tableService = new DefaultTableService(new Configurations(), CATALOG_MANAGER); handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); @@ -122,7 +122,7 @@ public void testInitialize() throws Exception { tableService = null; } - protected DefaultTableServiceOld tableService() { + protected DefaultTableService tableService() { if (tableService != null) { return tableService; } else { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java index 341c86c10d..afba8fab6b 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java @@ -21,11 +21,11 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.api.BlockableOperation; +import org.apache.amoro.server.AMSServiceTestBase; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; -import org.apache.amoro.server.table.TableManagerOld; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableServiceTestBase; +import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.table.blocker.TableBlocker; import org.junit.Assert; import org.junit.Before; @@ -35,27 +35,27 @@ import java.util.Collections; import java.util.List; -public class TestBlockerExpiringExecutor extends TableServiceTestBase { +public class TestBlockerExpiringExecutor extends AMSServiceTestBase { private final ServerTableIdentifier tableIdentifier = ServerTableIdentifier.of( 0L, "test_catalog", "test_db", "test_table_blocker", TableFormat.MIXED_ICEBERG); private final Persistency persistency = new Persistency(); private TableRuntime tableRuntime; - private TableManagerOld tableManager; + private TableService tableService; @Before public void mock() { tableRuntime = Mockito.mock(TableRuntime.class); - tableManager = Mockito.mock(TableManagerOld.class); + tableService = Mockito.mock(TableService.class); Mockito.when(tableRuntime.getTableIdentifier()).thenReturn(tableIdentifier); } @Test public void testExpireBlocker() { - BlockerExpiringExecutor blockerExpiringExecutor = new BlockerExpiringExecutor(tableManager); + BlockerExpiringExecutor blockerExpiringExecutor = new BlockerExpiringExecutor(tableService); TableBlocker tableBlocker = new TableBlocker(); - tableBlocker.setTableIdentifier(tableIdentifier.getIdentifier()); + tableBlocker.setTableIdentifier(tableIdentifier.getIdentifier().buildTableIdentifier()); tableBlocker.setExpirationTime(System.currentTimeMillis() - 10); tableBlocker.setCreateTime(System.currentTimeMillis() - 20); tableBlocker.setOperations(Collections.singletonList(BlockableOperation.OPTIMIZE.name())); @@ -63,7 +63,7 @@ public void testExpireBlocker() { persistency.insertTableBlocker(tableBlocker); TableBlocker tableBlocker2 = new TableBlocker(); - tableBlocker2.setTableIdentifier(tableIdentifier.getIdentifier()); + tableBlocker2.setTableIdentifier(tableIdentifier.getIdentifier().buildTableIdentifier()); tableBlocker2.setExpirationTime(System.currentTimeMillis() + 100000); tableBlocker2.setCreateTime(System.currentTimeMillis() - 20); tableBlocker2.setOperations(Collections.singletonList(BlockableOperation.BATCH_WRITE.name())); diff --git a/amoro-common/src/main/java/org/apache/amoro/ServerTableIdentifier.java b/amoro-common/src/main/java/org/apache/amoro/ServerTableIdentifier.java index 6d669e3fdb..1cc401266b 100644 --- a/amoro-common/src/main/java/org/apache/amoro/ServerTableIdentifier.java +++ b/amoro-common/src/main/java/org/apache/amoro/ServerTableIdentifier.java @@ -18,7 +18,7 @@ package org.apache.amoro; -import org.apache.amoro.api.TableIdentifier; +import org.apache.amoro.table.TableIdentifier; import java.util.Objects; @@ -138,6 +138,6 @@ public static ServerTableIdentifier of( } public TableIdentifier getIdentifier() { - return new TableIdentifier(catalog, database, tableName); + return TableIdentifier.of(catalog, database, tableName); } }