Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Zeta] Disable hdfs filesystem cache of checkpoint #6718

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/en/seatunnel-engine/checkpoint-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,40 @@ seatunnel:

```

### Enable cache

When storage:type is hdfs, cache is disabled by default. If you want to enable it, set `disable.cache: false`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share what risks will be caused if cache is turned on?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share what risks will be caused if cache is turned on?

I think in the scenario of seatunel, it should not be turned on at any time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share what risks will be caused if cache is turned on?

You can take a look at the bug #6678 I proposed, which is [S3 connector], which causes occasional task failures due to the use of cache. at the same time, you can also refer to these two issues, which are bug: https://issues.apache.org/jira/browse/HADOOP-15819 and aws/aws-sdk-java#2337 about hadoop-aws and aws-sdk-java. The problems caused by turning on cache in multithreaded environment are described in detail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense to me.


```yaml
seatunnel:
engine:
checkpoint:
interval: 6000
timeout: 7000
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: hdfs
disable.cache: false
fs.defaultFS: hdfs:///

```

or

```yaml
seatunnel:
engine:
checkpoint:
interval: 6000
timeout: 7000
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: hdfs
disable.cache: false
fs.defaultFS: file:///
```

36 changes: 36 additions & 0 deletions docs/zh/seatunnel-engine/checkpoint-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,39 @@ seatunnel:

```

### 开启高速缓存

当storage:type为hdfs时,默认关闭cache。如果您想启用它,请设置为`disable.cache: false`。

```yaml
seatunnel:
engine:
checkpoint:
interval: 6000
timeout: 7000
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: hdfs
disable.cache: false
fs.defaultFS: hdfs:/// # Ensure that the directory has written permission
```

or

```yaml
seatunnel:
engine:
checkpoint:
interval: 6000
timeout: 7000
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: hdfs
disable.cache: false
fs.defaultFS: file:///
```

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public abstract class AbstractConfiguration {

protected static final String HDFS_IMPL_KEY = "impl";

protected static final String COMMON_DISABLE_CACHE = "%s.disable.cache";

protected static final String DISABLE_CACHE_DEFAULT_VALUE = "TRUE";

protected static final String DISABLE_CACHE_KEY = "disable.cache";
/**
* check the configuration keys
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public Configuration buildConfiguration(Map<String, String> config)
if (config.containsKey(HDFS_SITE_PATH)) {
hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
}
hadoopConf.setBoolean(
String.format(COMMON_DISABLE_CACHE, HDFS_IMPL_KEY),
Boolean.parseBoolean(
config.getOrDefault(DISABLE_CACHE_KEY, DISABLE_CACHE_DEFAULT_VALUE)));
// support other hdfs optional config keys
config.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public Configuration buildConfiguration(Map<String, String> config) {
hadoopConf.set(
FS_DEFAULT_NAME_KEY,
config.getOrDefault(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME_DEFAULT));
hadoopConf.setBoolean(
String.format(COMMON_DISABLE_CACHE, HDFS_LOCAL_IMPL_KEY),
Boolean.parseBoolean(
config.getOrDefault(DISABLE_CACHE_KEY, DISABLE_CACHE_DEFAULT_VALUE)));

return hadoopConf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public Configuration buildConfiguration(Map<String, String> config) {
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(OSS_BUCKET_KEY));
hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL);
hadoopConf.setBoolean(
String.format(COMMON_DISABLE_CACHE, OSS_IMPL_KEY),
Boolean.parseBoolean(
config.getOrDefault(DISABLE_CACHE_KEY, DISABLE_CACHE_DEFAULT_VALUE)));
setExtraConfiguration(hadoopConf, config, OSS_KEY);
return hadoopConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public Configuration buildConfiguration(Map<String, String> config) {
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY));
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
hadoopConf.setBoolean(
String.format(COMMON_DISABLE_CACHE, formatKey(protocol, HDFS_IMPL_KEY)),
Boolean.parseBoolean(
config.getOrDefault(DISABLE_CACHE_KEY, DISABLE_CACHE_DEFAULT_VALUE)));
setExtraConfiguration(hadoopConf, config, FS_KEY + protocol + SPLIT_CHAR);
return hadoopConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class HDFSFileCheckpointTest extends AbstractFileCheckPointTest {
public static void setup() throws CheckpointStorageException {
Map<String, String> config = new HashMap<>();
config.put("storage.type", "hdfs");
config.put("fs.defaultFS", "hdfs://usdp-bing");
config.put("disable.cache", "false");
config.put("seatunnel.hadoop.dfs.nameservices", "usdp-bing");
config.put("seatunnel.hadoop.dfs.ha.namenodes.usdp-bing", "nn1,nn2");
config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1", "usdp-bing-nn1:8020");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class LocalFileCheckPointTest extends AbstractFileCheckPointTest {
public static void setup() throws CheckpointStorageException {
HashMap config = new HashMap();
config.put("namespace", "/tmp/");
config.put("disable.cache", "false");
STORAGE = new HdfsStorage(config);
initStorageData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class OssFileCheckpointTest extends AbstractFileCheckPointTest {
public static void setup() throws CheckpointStorageException {
Map<String, String> config = new HashMap<>();
config.put("storage.type", "oss");
config.put("disable.cache", "false");
config.put("fs.oss.accessKeyId", "your access key id");
config.put("fs.oss.accessKeySecret", "your access key secret");
config.put("fs.oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class S3FileCheckpointTest extends AbstractFileCheckPointTest {
public static void setup() throws CheckpointStorageException {
Map<String, String> config = new HashMap<>();
config.put("storage.type", "s3");
config.put("disable.cache", "false");
config.put("fs.s3a.access.key", "your access key");
config.put("fs.s3a.secret.key", "your secret key");
config.put("s3.bucket", "s3a://calvin.test.cn");
Expand Down
Loading