diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 92f0dff2b9..1dd763f324 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -52,8 +52,8 @@ import org.apache.amoro.shade.thrift.org.apache.thrift.TProcessor; import org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TBinaryProtocol; import org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolFactory; +import org.apache.amoro.shade.thrift.org.apache.thrift.server.THsHaServer; import org.apache.amoro.shade.thrift.org.apache.thrift.server.TServer; -import org.apache.amoro.shade.thrift.org.apache.thrift.server.TThreadedSelectorServer; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory; @@ -374,15 +374,13 @@ private TServer createThriftServer( TTransportFactory transportFactory = new TFramedTransport.Factory(); TMultiplexedProcessor multiplexedProcessor = new TMultiplexedProcessor(); multiplexedProcessor.registerProcessor(processorName, processor); - TThreadedSelectorServer.Args args = - new TThreadedSelectorServer.Args(serverTransport) + THsHaServer.Args args = + new THsHaServer.Args(serverTransport) .processor(multiplexedProcessor) .transportFactory(transportFactory) .protocolFactory(protocolFactory) .inputProtocolFactory(inputProtoFactory) - .executorService(executorService) - .selectorThreads(selectorThreads) - .acceptQueueSizePerThread(queueSizePerSelector); + .executorService(executorService); LOG.info( "The number of selector threads for the {} thrift server is: {}", processorName, @@ -391,7 +389,7 @@ private TServer createThriftServer( "The size of per-selector queue for the {} thrift server is: {}", processorName, queueSizePerSelector); - return new TThreadedSelectorServer(args); + return new THsHaServer(args); } private ThreadFactory getThriftThreadFactory(String processorName) { diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java index ad41de77ed..bfc4a70b83 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java @@ -18,6 +18,9 @@ package org.apache.amoro.server.catalog; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; import org.apache.amoro.AmoroTable; import org.apache.amoro.TableFormat; import org.apache.amoro.api.CatalogMeta; @@ -40,6 +43,7 @@ import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.requests.CreateTableRequest; @@ -50,10 +54,13 @@ public class InternalIcebergCatalogImpl extends InternalCatalog { final int httpPort; final String exposedHost; + final Cache, FileIO> fileIOCloser; + protected InternalIcebergCatalogImpl(CatalogMeta metadata, Configurations serverConfiguration) { super(metadata); this.httpPort = serverConfiguration.getInteger(AmoroManagementConf.HTTP_SERVER_PORT); this.exposedHost = serverConfiguration.getString(AmoroManagementConf.SERVER_EXPOSE_HOST); + this.fileIOCloser = newFileIOCloser(); } @Override @@ -96,12 +103,14 @@ public AmoroTable loadTable(String database, String tableName) { .toString()); org.apache.amoro.table.TableIdentifier tableIdentifier = org.apache.amoro.table.TableIdentifier.of(name(), database, tableName); - - return IcebergTable.newIcebergTable( - tableIdentifier, - table, - CatalogUtil.buildMetaStore(getMetadata()), - getMetadata().getCatalogProperties()); + AmoroTable amoroTable = + IcebergTable.newIcebergTable( + tableIdentifier, + table, + CatalogUtil.buildMetaStore(getMetadata()), + getMetadata().getCatalogProperties()); + fileIOCloser.put(amoroTable, ops.io()); + return amoroTable; } protected AuthenticatedFileIO fileIO(CatalogMeta catalogMeta) { @@ -144,4 +153,17 @@ public InternalTableHandler newTableHandler(String database, String table //noinspection unchecked return (InternalTableHandler) new InternalIcebergHandler(getMetadata(), metadata); } + + private Cache, FileIO> newFileIOCloser() { + return Caffeine.newBuilder() + .weakKeys() + .removalListener( + (RemovalListener, FileIO>) + (tbl, fileIO, cause) -> { + if (null != fileIO) { + fileIO.close(); + } + }) + .build(); + } } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java index 48731d882c..45d2cfb339 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java @@ -142,9 +142,10 @@ tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()), new BasicUnkeyedTable( tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()); } - - return new org.apache.amoro.formats.mixed.MixedTable( - mixedIcebergTable, TableFormat.MIXED_ICEBERG); + AmoroTable amoroTable = + new org.apache.amoro.formats.mixed.MixedTable(mixedIcebergTable, TableFormat.MIXED_ICEBERG); + fileIOCloser.put(amoroTable, fileIO); + return amoroTable; } protected TableFormat format() {