Skip to content

Commit

Permalink
TEZ-4479: Eagerly Init/Load FileSystem In Tez Task Containers
Browse files Browse the repository at this point in the history
  • Loading branch information
shameersss1 committed Mar 27, 2023
1 parent 6bd6f9c commit af4d9a2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,14 @@ public TezConfiguration(boolean loadDefaults) {

public static final int TASK_HEARTBEAT_TIMEOUT_CHECK_MS_DEFAULT = 30 * 1000;

/**
* String value. Comma seperated list of FileSystem paths which needs to be eagerly initialized.
* For example s3://bucket/,file://,hdfs://localhost:8020/
*/
@ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty
public static final String TEZ_TASK_EAGER_INIT_FS_PATHS = TEZ_TASK_PREFIX + "eager.init.fs.paths";

/**
* Whether to scale down memory requested by each component if the total
* exceeds the available JVM memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -40,6 +41,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
Expand Down Expand Up @@ -130,6 +132,7 @@ public class TezChild {
private final HadoopShim hadoopShim;
private final TezExecutors sharedExecutor;
private ThreadLocalMap mdcContext;
private static ExecutorService eagerInitFsPool;

public TezChild(Configuration conf, String host, int port, String containerIdentifier,
String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
Expand Down Expand Up @@ -503,6 +506,33 @@ public static TezChild newTezChild(Configuration conf, String host, int port, St
hadoopShim);
}

private static void eagerInitFileSystemPaths(Configuration conf) {
Collection<String> eagerInitPaths = conf.getTrimmedStringCollection(
TezConfiguration.TEZ_TASK_EAGER_INIT_FS_PATHS);
if (eagerInitFsPool == null && !eagerInitPaths.isEmpty()) {
eagerInitFsPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Eager-Init-Fs-Thread-%d")
.build());
}
for (String path : eagerInitPaths) {
eagerInitFsPool.execute(new Runnable() {
@Override
public void run() {
try {
long startTime = System.currentTimeMillis();
FileSystem fs = new Path(path).getFileSystem(conf);
long duration = System.currentTimeMillis() - startTime;
LOG.info("Eagerly initiated FileSystem at path {} in {} ms", path, duration);
} catch (Exception e) {
// swallow the exception since this doesn't block the core functionality
LOG.error("Unable to eager init FileSystem at the path {}", path, e);
}
}
});
}
}

public static void main(String[] args) throws IOException, InterruptedException, TezException {
TezClassLoader.setupTezClassLoader();
final Configuration defaultConf = new Configuration();
Expand Down Expand Up @@ -530,6 +560,8 @@ public static void main(String[] args) throws IOException, InterruptedException,
DAGProtos.ConfigurationProto confProto =
TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()));
TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, confProto.getConfKeyValuesList());
// eagerly load configured filesystem before it is actually required
eagerInitFileSystemPaths(defaultConf);
UserGroupInformation.setConfiguration(defaultConf);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();

Expand Down

0 comments on commit af4d9a2

Please sign in to comment.