Skip to content

Commit

Permalink
complete
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyongxiang.alpha committed Jan 14, 2025
1 parent f0e752e commit 3f4c8de
Show file tree
Hide file tree
Showing 47 changed files with 367 additions and 874 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,9 @@
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.DefaultTableManager;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.DefaultTableServiceOld;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.TableServiceOld;
import org.apache.amoro.server.table.executor.AsyncTableExecutors;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.server.utils.ThriftServiceProxy;
Expand Down Expand Up @@ -155,10 +153,11 @@ public void startService() throws Exception {
MetricManager.getInstance();

catalogManager = new DefaultCatalogManager(serviceConfig);

tableManager = new DefaultTableManager(serviceConfig, catalogManager);

tableService = new DefaultTableService(serviceConfig, catalogManager);
optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, tableService);
optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, tableManager, tableService);

LOG.info("Setting up AMS table executors...");
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand Down Expand Up @@ -253,7 +252,12 @@ private void startThriftServer(TServer server, String threadName) {
private void initHttpService() {
DashboardServer dashboardServer =
new DashboardServer(
serviceConfig, catalogManager, tableManager, optimizingService, terminalManager);
serviceConfig,
catalogManager,
tableManager,
optimizingService,
terminalManager,
tableService);
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager);

httpServer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,10 @@
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.OptimizerThread;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.DefaultTableServiceOld;
import org.apache.amoro.server.table.MaintainedTableManager;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.TableServiceOld;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -145,7 +143,7 @@ private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
List<TableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
catalogManager,
group,
this,
planExecutor,
Expand Down Expand Up @@ -322,7 +320,7 @@ public void createResourceGroup(ResourceGroup resourceGroup) {
doAs(ResourceMapper.class, mapper -> mapper.insertResourceGroup(resourceGroup));
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
catalogManager,
resourceGroup,
this,
planExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.manager.EventsManager;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.table.TableServiceOld;
import org.apache.amoro.server.table.internal.InternalTableCreator;
import org.apache.amoro.server.table.internal.InternalTableHandler;
import org.apache.amoro.server.table.internal.InternalTableManager;
Expand Down Expand Up @@ -348,7 +347,8 @@ public void deleteTable(Context ctx) {
Boolean.parseBoolean(
Optional.ofNullable(ctx.req.getParameter("purgeRequested")).orElse("false"));
org.apache.amoro.server.table.TableMetadata tableMetadata = handler.tableMetadata();
tableManager.dropTableMetadata(tableMetadata.getTableIdentifier().getIdentifier(), purge);
tableManager.dropTableMetadata(
tableMetadata.getTableIdentifier().getIdentifier().buildTableIdentifier(), purge);
handler.dropTable(purge);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.table.MaintainedTableManager;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableMetadata;
import org.apache.amoro.server.table.TableServiceOld;
import org.apache.amoro.server.utils.InternalTableUtil;
import org.apache.amoro.shade.thrift.org.apache.thrift.TException;

Expand Down Expand Up @@ -91,7 +89,10 @@ public void createTableMeta(TableMeta tableMeta) {
}
ServerTableIdentifier identifier =
ServerTableIdentifier.of(
tableMeta.getTableIdentifier(), TableFormat.valueOf(tableMeta.getFormat()));
tableMeta.getTableIdentifier().getCatalog(),
tableMeta.getTableIdentifier().getDatabase(),
tableMeta.getTableIdentifier().getTableName(),
TableFormat.valueOf(tableMeta.getFormat()));
InternalCatalog catalog = catalogManager.getInternalCatalog(identifier.getCatalog());
CatalogMeta catalogMeta = catalog.getMetadata();
TableMetadata tableMetadata = new TableMetadata(identifier, tableMeta, catalogMeta);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.amoro.server.catalog;

import org.apache.amoro.AmoroTable;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.table.TableIdentifier;

import java.util.List;

Expand Down Expand Up @@ -90,4 +92,12 @@ public interface CatalogManager {
* @param catalogMeta The CatalogMeta object representing the updated catalog information.
*/
void updateCatalog(CatalogMeta catalogMeta);

/**
* load a table via server catalog.
*
* @param identifier managed table identifier
* @return managed table.
*/
AmoroTable<?> loadTable(TableIdentifier identifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.amoro.server.catalog;

import org.apache.amoro.AmoroTable;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.exception.AlreadyExistsException;
Expand All @@ -31,6 +32,7 @@
import org.apache.amoro.shade.guava32.com.google.common.cache.CacheLoader;
import org.apache.amoro.shade.guava32.com.google.common.cache.LoadingCache;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.TableIdentifier;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -195,6 +197,12 @@ public void updateCatalog(CatalogMeta catalogMeta) {
LOG.info("Update catalog metadata: {}", catalogMeta.getCatalogName());
}

@Override
public AmoroTable<?> loadTable(TableIdentifier identifier) {
ServerCatalog serverCatalog = getServerCatalog(identifier.getCatalog());
return serverCatalog.loadTable(identifier.getDatabase(), identifier.getTableName());
}

private void validateCatalogUpdate(CatalogMeta oldMeta, CatalogMeta newMeta) {
if (!oldMeta.getCatalogType().equals(newMeta.getCatalogType())) {
throw new IllegalMetadataException("Cannot update catalog type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableIDWithFormat;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.exception.AlreadyExistsException;
import org.apache.amoro.exception.IllegalMetadataException;
import org.apache.amoro.exception.ObjectNotExistsException;
Expand All @@ -32,6 +31,7 @@
import org.apache.amoro.server.table.TableMetadata;
import org.apache.amoro.server.table.internal.InternalTableCreator;
import org.apache.amoro.server.table.internal.InternalTableHandler;
import org.apache.amoro.table.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;

import java.util.List;
Expand Down Expand Up @@ -101,11 +101,7 @@ public List<TableIDWithFormat> listTables(String database) {
TableMetaMapper.class,
mapper -> mapper.selectTableIdentifiersByDb(getMetadata().getCatalogName(), database))
.stream()
.map(
sid ->
TableIDWithFormat.of(
org.apache.amoro.table.TableIdentifier.of(sid.getIdentifier()),
sid.getFormat()))
.map(sid -> TableIDWithFormat.of(sid.getIdentifier(), sid.getFormat()))
.collect(Collectors.toList());
}

Expand All @@ -115,11 +111,7 @@ public List<TableIDWithFormat> listTables() {
TableMetaMapper.class,
mapper -> mapper.selectTableIdentifiersByCatalog(getMetadata().getCatalogName()))
.stream()
.map(
sid ->
TableIDWithFormat.of(
org.apache.amoro.table.TableIdentifier.of(sid.getIdentifier()),
sid.getFormat()))
.map(sid -> TableIDWithFormat.of(sid.getIdentifier(), sid.getFormat()))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.utils.ParamSignatureCalculator;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableServiceOld;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -101,19 +101,24 @@ public DashboardServer(
CatalogManager catalogManager,
TableManager tableManager,
DefaultOptimizingService optimizerManager,
TerminalManager terminalManager) {
TerminalManager terminalManager,
TableService tableService) {
PlatformFileManager platformFileManager = new PlatformFileManager();
this.catalogController = new CatalogController(catalogManager, platformFileManager);
this.healthCheckController = new HealthCheckController();
this.loginController = new LoginController(serviceConfig);
this.optimizerGroupController = new OptimizerGroupController(tableService, optimizerManager);
// TODO: remove table service from OptimizerGroupController
this.optimizerGroupController =
new OptimizerGroupController(tableManager, tableService, optimizerManager);
this.optimizerController = new OptimizerController(optimizerManager);
this.platformFileInfoController = new PlatformFileInfoController(platformFileManager);
this.settingController = new SettingController(serviceConfig, optimizerManager);
ServerTableDescriptor tableDescriptor =
new ServerTableDescriptor(catalogManager, tableManager, serviceConfig);
// TODO: remove table service from TableController
this.tableController =
new TableController(catalogManager, tableService, tableDescriptor, serviceConfig);
new TableController(
catalogManager, tableManager, tableService, tableDescriptor, serviceConfig);
this.terminalController = new TerminalController(terminalManager);
this.versionController = new VersionController();
this.overviewController = new OverviewController();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableServiceOld;
import org.apache.amoro.table.descriptor.AmoroSnapshotsOfTable;
import org.apache.amoro.table.descriptor.ConsumerInfo;
import org.apache.amoro.table.descriptor.DDLInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableServiceOld;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -58,11 +59,15 @@ public class OptimizerGroupController {
private static final Logger LOG = LoggerFactory.getLogger(OptimizerGroupController.class);

private static final String ALL_GROUP = "all";
private final TableServiceOld tableService;
private final TableManager tableManager;
private final TableService tableService;
private final DefaultOptimizingService optimizerManager;

public OptimizerGroupController(
TableServiceOld tableService, DefaultOptimizingService optimizerManager) {
TableManager tableManager,
TableService tableService,
DefaultOptimizingService optimizerManager) {
this.tableManager = tableManager;
this.tableService = tableService;
this.optimizerManager = optimizerManager;
}
Expand Down Expand Up @@ -102,7 +107,7 @@ public void getOptimizerTables(Context ctx) {
statusCodes = null;
}
Pair<List<TableRuntimeMeta>, Integer> tableRuntimeBeans =
tableService.getTableRuntimes(
tableManager.queryTableRuntimeMetas(
optimizerGroupUsedInDbFilter,
dbFilterStr,
tableFilterStr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.CommonUtil;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableServiceOld;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Function;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -101,7 +102,8 @@ public class TableController {
private static final long UPGRADE_INFO_EXPIRE_INTERVAL = 60 * 60 * 1000;

private final CatalogManager catalogManager;
private final TableServiceOld tableService;
private final TableManager tableManager;
private final TableService tableService;
private final ServerTableDescriptor tableDescriptor;
private final Configurations serviceConfig;
private final ConcurrentHashMap<TableIdentifier, UpgradeRunningInfo> upgradeRunningInfo =
Expand All @@ -110,10 +112,12 @@ public class TableController {

public TableController(
CatalogManager catalogManager,
TableServiceOld tableService,
TableManager tableManager,
TableService tableService,
ServerTableDescriptor tableDescriptor,
Configurations serviceConfig) {
this.catalogManager = catalogManager;
this.tableManager = tableManager;
this.tableService = tableService;
this.tableDescriptor = tableDescriptor;
this.serviceConfig = serviceConfig;
Expand Down Expand Up @@ -150,7 +154,7 @@ public void getTableDetail(Context ctx) {
TableSummary tableSummary = serverTableMeta.getTableSummary();
Optional<ServerTableIdentifier> serverTableIdentifier =
Optional.ofNullable(
tableService.getServerTableIdentifier(
tableManager.getServerTableIdentifier(
TableIdentifier.of(catalog, database, tableName).buildTableIdentifier()));
if (serverTableIdentifier.isPresent()) {
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get().getId());
Expand Down Expand Up @@ -679,7 +683,7 @@ public void cancelOptimizingProcess(Context ctx) {
Preconditions.checkState(catalogManager.catalogExist(catalog), "invalid catalog!");

ServerTableIdentifier serverTableIdentifier =
tableService.getServerTableIdentifier(
tableManager.getServerTableIdentifier(
TableIdentifier.of(catalog, db, table).buildTableIdentifier());
TableRuntime tableRuntime =
serverTableIdentifier != null
Expand Down
Loading

0 comments on commit 3f4c8de

Please sign in to comment.