Skip to content

Commit

Permalink
Merge branch 'master' into feature/data_expire_by_partition_info
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoujinsong authored Jan 14, 2025
2 parents c1ebdd2 + c72f121 commit 6f537bd
Show file tree
Hide file tree
Showing 65 changed files with 836 additions and 563 deletions.
16 changes: 8 additions & 8 deletions amoro-ams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,6 @@
<artifactId>derby</artifactId>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
Expand Down Expand Up @@ -403,6 +397,13 @@
<version>${pagehelper.version}</version>
</dependency>

<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql-connector-j.version}</version>
<scope>${mysql-j-dependency-scope}</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
Expand Down Expand Up @@ -451,8 +452,7 @@
<version>1.19.6</version>
<scope>test</scope>
</dependency>



</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public class AmoroManagementConf {
.defaultValue("admin")
.withDescription("The administrator password");

public static final ConfigOption<Duration> CATALOG_META_CACHE_EXPIRATION_INTERVAL =
ConfigOptions.key("catalog-meta-cache.expiration-interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription("TTL for catalog metadata.");

public static final ConfigOption<Integer> TABLE_MANIFEST_IO_THREAD_COUNT =
ConfigOptions.key("table-manifest-io.thread-count")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.amoro.config.ConfigHelpers;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.exception.AmoroRuntimeException;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.dashboard.DashboardServer;
import org.apache.amoro.server.dashboard.JavalinJsonMapper;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
Expand Down Expand Up @@ -92,6 +94,7 @@ public class AmoroServiceContainer {

private final HighAvailabilityContainer haContainer;
private DataSource dataSource;
private CatalogManager catalogManager;
private DefaultTableService tableService;
private DefaultOptimizingService optimizingService;
private TerminalManager terminalManager;
Expand Down Expand Up @@ -146,8 +149,9 @@ public void startService() throws Exception {
EventsManager.getInstance();
MetricManager.getInstance();

tableService = new DefaultTableService(serviceConfig);
optimizingService = new DefaultOptimizingService(serviceConfig, tableService);
catalogManager = new DefaultCatalogManager(serviceConfig);
tableService = new DefaultTableService(serviceConfig, catalogManager);
optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, tableService);

LOG.info("Setting up AMS table executors...");
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand All @@ -164,7 +168,7 @@ public void startService() throws Exception {
addHandlerChain(AsyncTableExecutors.getInstance().getTagsAutoCreatingExecutor());
tableService.initialize();
LOG.info("AMS table service have been initialized");
terminalManager = new TerminalManager(serviceConfig, tableService);
terminalManager = new TerminalManager(serviceConfig, catalogManager, tableService);

initThriftService();
startThriftService();
Expand Down Expand Up @@ -240,8 +244,9 @@ private void startThriftServer(TServer server, String threadName) {

private void initHttpService() {
DashboardServer dashboardServer =
new DashboardServer(serviceConfig, tableService, optimizingService, terminalManager);
RestCatalogService restCatalogService = new RestCatalogService(tableService);
new DashboardServer(
serviceConfig, catalogManager, tableService, optimizingService, terminalManager);
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableService);

httpServer =
Javalin.create(
Expand Down Expand Up @@ -333,7 +338,7 @@ private void initThriftService() throws TTransportException {
new AmoroTableMetastore.Processor<>(
ThriftServiceProxy.createProxy(
AmoroTableMetastore.Iface.class,
new TableManagementService(tableService),
new TableManagementService(catalogManager, tableService),
AmoroRuntimeException::normalizeCompatibly));
tableManagementServer =
createThriftServer(
Expand Down Expand Up @@ -536,6 +541,11 @@ public TableService getTableService() {
return this.tableService;
}

@VisibleForTesting
public CatalogManager getCatalogManager() {
return this.catalogManager;
}

@VisibleForTesting
public OptimizerManager getOptimizingService() {
return this.optimizingService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
Expand Down Expand Up @@ -97,17 +98,22 @@ public class DefaultOptimizingService extends StatedPersistentBase
private final Map<String, OptimizingQueue> optimizingQueueByToken = new ConcurrentHashMap<>();
private final Map<String, OptimizerInstance> authOptimizers = new ConcurrentHashMap<>();
private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper();
private final CatalogManager catalogManager;
private final TableService tableService;
private final RuntimeHandlerChain tableHandlerChain;
private final ExecutorService planExecutor;

public DefaultOptimizingService(Configurations serviceConfig, DefaultTableService tableService) {
public DefaultOptimizingService(
Configurations serviceConfig,
CatalogManager catalogManager,
DefaultTableService tableService) {
this.optimizerTouchTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT);
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.pollingTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
this.tableService = tableService;
this.catalogManager = catalogManager;
this.tableHandlerChain = new TableRuntimeHandlerImpl();
this.planExecutor =
Executors.newCachedThreadPool(
Expand Down Expand Up @@ -199,7 +205,7 @@ public OptimizingTask pollTask(String authToken, int threadId) {
}

private OptimizingTask extractOptimizingTask(
TaskRuntime task, String authToken, int threadId, OptimizingQueue queue) {
TaskRuntime<?> task, String authToken, int threadId, OptimizingQueue queue) {
try {
OptimizerThread optimizerThread = getAuthenticatedOptimizer(authToken).getThread(threadId);
task.schedule(optimizerThread);
Expand Down Expand Up @@ -391,7 +397,7 @@ public void dispose() {
}

public boolean canDeleteResourceGroup(String name) {
for (CatalogMeta catalogMeta : tableService.listCatalogMetas()) {
for (CatalogMeta catalogMeta : catalogManager.listCatalogMetas()) {
if (catalogMeta.getCatalogProperties() != null
&& catalogMeta
.getCatalogProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.amoro.events.IcebergReportEvent;
import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.manager.EventsManager;
Expand Down Expand Up @@ -106,9 +107,11 @@ public class RestCatalogService extends PersistentBase {

private final JavalinJackson jsonMapper;

private final CatalogManager catalogManager;
private final TableService tableService;

public RestCatalogService(TableService tableService) {
public RestCatalogService(CatalogManager catalogManager, TableService tableService) {
this.catalogManager = catalogManager;
this.tableService = tableService;
ObjectMapper objectMapper = jsonMapper();
this.jsonMapper = new JavalinJackson(objectMapper);
Expand Down Expand Up @@ -432,7 +435,7 @@ private void handleTable(

private InternalCatalog getCatalog(String catalog) {
Preconditions.checkNotNull(catalog, "lack required path variables: catalog");
ServerCatalog internalCatalog = tableService.getServerCatalog(catalog);
ServerCatalog internalCatalog = catalogManager.getServerCatalog(catalog);
Preconditions.checkArgument(
internalCatalog instanceof InternalCatalog, "The catalog is not an iceberg rest catalog");
Set<TableFormat> tableFormats = CatalogUtil.tableFormats(internalCatalog.getMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.amoro.api.TableCommitMeta;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.api.TableMeta;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.table.TableMetadata;
Expand All @@ -42,9 +43,11 @@

public class TableManagementService implements AmoroTableMetastore.Iface {

private final CatalogManager catalogManager;
private final TableService tableService;

public TableManagementService(TableService tableService) {
public TableManagementService(CatalogManager catalogManager, TableService tableService) {
this.catalogManager = catalogManager;
this.tableService = tableService;
}

Expand All @@ -53,29 +56,29 @@ public void ping() {}

@Override
public List<CatalogMeta> getCatalogs() {
return tableService.listCatalogMetas();
return catalogManager.listCatalogMetas();
}

@Override
public CatalogMeta getCatalog(String name) {
return tableService.getCatalogMeta(name);
return catalogManager.getCatalogMeta(name);
}

@Override
public List<String> getDatabases(String catalogName) {
ServerCatalog serverCatalog = tableService.getServerCatalog(catalogName);
ServerCatalog serverCatalog = catalogManager.getServerCatalog(catalogName);
return serverCatalog.listDatabases();
}

@Override
public void createDatabase(String catalogName, String database) {
InternalCatalog serverCatalog = tableService.getInternalCatalog(catalogName);
InternalCatalog serverCatalog = catalogManager.getInternalCatalog(catalogName);
serverCatalog.createDatabase(database);
}

@Override
public void dropDatabase(String catalogName, String database) {
InternalCatalog serverCatalog = tableService.getInternalCatalog(catalogName);
InternalCatalog serverCatalog = catalogManager.getInternalCatalog(catalogName);
serverCatalog.dropDatabase(database);
}

Expand All @@ -87,15 +90,15 @@ public void createTableMeta(TableMeta tableMeta) {
ServerTableIdentifier identifier =
ServerTableIdentifier.of(
tableMeta.getTableIdentifier(), TableFormat.valueOf(tableMeta.getFormat()));
InternalCatalog catalog = tableService.getInternalCatalog(identifier.getCatalog());
InternalCatalog catalog = catalogManager.getInternalCatalog(identifier.getCatalog());
CatalogMeta catalogMeta = catalog.getMetadata();
TableMetadata tableMetadata = new TableMetadata(identifier, tableMeta, catalogMeta);
tableService.createTable(catalog.name(), tableMetadata);
}

@Override
public List<TableMeta> listTables(String catalogName, String database) {
InternalCatalog serverCatalog = tableService.getInternalCatalog(catalogName);
InternalCatalog serverCatalog = catalogManager.getInternalCatalog(catalogName);
List<TableMetadata> tableMetadataList = serverCatalog.listTableMetadataInDatabase(database);
return tableMetadataList.stream()
.map(TableMetadata::buildTableMeta)
Expand All @@ -104,7 +107,7 @@ public List<TableMeta> listTables(String catalogName, String database) {

@Override
public TableMeta getTable(TableIdentifier tableIdentifier) {
InternalCatalog serverCatalog = tableService.getInternalCatalog(tableIdentifier.getCatalog());
InternalCatalog serverCatalog = catalogManager.getInternalCatalog(tableIdentifier.getCatalog());
TableMetadata tableMetadata =
serverCatalog.loadTableMetadata(
tableIdentifier.getDatabase(), tableIdentifier.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import java.util.List;

/** The CatalogService interface defines the operations that can be performed on catalogs. */
public interface CatalogService {
/** The CatalogManager interface defines the operations that can be performed on catalogs. */
public interface CatalogManager {
/**
* Returns a list of CatalogMeta objects.
*
Expand Down Expand Up @@ -62,6 +62,13 @@ public interface CatalogService {
*/
InternalCatalog getInternalCatalog(String catalogName);

/**
* Retrieves all ExternalCatalogs.
*
* @return a list of ExternalCatalogs
*/
List<ExternalCatalog> getExternalCatalogs();

/**
* Creates a catalog based on the provided catalog meta information. The catalog name is obtained
* from the catalog meta.
Expand Down
Loading

0 comments on commit 6f537bd

Please sign in to comment.