Skip to content

Commit

Permalink
enable prefetch. support s3 host style access
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <[email protected]>
  • Loading branch information
dmetasoul01 committed Dec 27, 2023
1 parent f5e075e commit 1a44831
Show file tree
Hide file tree
Showing 8 changed files with 406 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,13 @@ public static void setFSConfigs(Configuration conf, NativeIOBase io) {
setFSConf(conf, "fs.s3a.secret.key", "fs.s3a.secret.key", io);
setFSConf(conf, "fs.s3a.endpoint", "fs.s3a.endpoint", io);
setFSConf(conf, "fs.s3a.endpoint.region", "fs.s3a.endpoint.region", io);
setFSConf(conf, "fs.s3a.path.style.access", "fs.s3a.path.style.access", io);
// try flink's s3 credential configs
setFSConf(conf, "s3.access-key", "fs.s3a.access.key", io);
setFSConf(conf, "s3.secret-key", "fs.s3a.secret.key", io);
setFSConf(conf, "s3.endpoint", "fs.s3a.endpoint", io);
setFSConf(conf, "s3.endpoint.region", "fs.s3a.endpoint.region", io);
setFSConf(conf, "s3.path.style.access", "fs.s3a.path.style.access", io);
}

public static void setFSConf(Configuration conf, String confKey, String fsConfKey, NativeIOBase io) {
Expand All @@ -443,7 +446,6 @@ public static void setFSConf(Configuration conf, String confKey, String fsConfKe
}
}


public static Object convertStringToInternalValue(String valStr, LogicalType type) {
if (valStr == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ private LakeSoulConfig(Map<String, String> config){
this.endpoint = config.get("fs.s3a.endpoint");
this.defaultFS = config.get("fs.defaultFS");
this.user = config.get("fs.hdfs.user");
this.virtualPathStyle = Boolean.parseBoolean(config.getOrDefault("fs.s3a.path.style.access", "false"));
this.timeZone = config.getOrDefault("timezone","");
}

Expand All @@ -40,6 +41,7 @@ private LakeSoulConfig(Map<String, String> config){
private String user;
private String defaultFS;
private String timeZone;
private boolean virtualPathStyle;


public String getAccessKey() {
Expand Down Expand Up @@ -106,4 +108,11 @@ public void setTimeZone(String timeZone) {
this.timeZone = timeZone;
}

public boolean isVirtualPathStyle() {
return virtualPathStyle;
}

public void setVirtualPathStyle(boolean virtualPathStyle) {
this.virtualPathStyle = virtualPathStyle;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public LakeSoulRecordCursor(LakeSoulRecordSet recordSet) throws IOException {
LakeSoulConfig.getInstance().getBucketName(),
LakeSoulConfig.getInstance().getEndpoint(),
LakeSoulConfig.getInstance().getDefaultFS(),
LakeSoulConfig.getInstance().getUser()
LakeSoulConfig.getInstance().getUser(),
LakeSoulConfig.getInstance().isVirtualPathStyle()
);

// init reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class NativeIOOptions(val s3Bucket: String,
val s3Endpoint: String,
val s3Region: String,
val fsUser: String,
val defaultFS: String
val defaultFS: String,
val virtual_path_style: Boolean
)

object NativeIOUtils{
Expand Down Expand Up @@ -69,11 +70,12 @@ object NativeIOUtils{
val s3aRegion = taskAttemptContext.getConfiguration.get("fs.s3a.endpoint.region")
val s3aAccessKey = taskAttemptContext.getConfiguration.get("fs.s3a.access.key")
val s3aSecretKey = taskAttemptContext.getConfiguration.get("fs.s3a.secret.key")
return new NativeIOOptions(awsS3Bucket, s3aAccessKey, s3aSecretKey, s3aEndpoint, s3aRegion, user, defaultFS)
val virtualPathStyle = taskAttemptContext.getConfiguration.getBoolean("fs.s3a.path.style.access", false)
return new NativeIOOptions(awsS3Bucket, s3aAccessKey, s3aSecretKey, s3aEndpoint, s3aRegion, user, defaultFS, virtualPathStyle)
case _ =>
}
}
new NativeIOOptions(null, null, null, null, null, user, defaultFS)
new NativeIOOptions(null, null, null, null, null, user, defaultFS, false)
}

def setNativeIOOptions(nativeIO: NativeIOBase, options: NativeIOOptions): Unit = {
Expand All @@ -84,7 +86,8 @@ object NativeIOUtils{
options.s3Bucket,
options.s3Endpoint,
options.fsUser,
options.defaultFS
options.defaultFS,
options.virtual_path_style
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,16 @@ public void setBufferSize(int bufferSize) {

public void setObjectStoreOptions(String accessKey, String accessSecret,
String region, String bucketName, String endpoint,
String user, String defaultFS) {
String user, String defaultFS,
boolean virtual_path_style) {
setObjectStoreOption("fs.s3a.access.key", accessKey);
setObjectStoreOption("fs.s3a.secret.key", accessSecret);
setObjectStoreOption("fs.s3a.endpoint.region", region);
setObjectStoreOption("fs.s3a.bucket", bucketName);
setObjectStoreOption("fs.s3a.endpoint", endpoint);
setObjectStoreOption("fs.defaultFS", defaultFS);
setObjectStoreOption("fs.hdfs.user", user);
setObjectStoreOption("fs.s3a.path.style.access", String.valueOf(virtual_path_style));
}

public void setObjectStoreOption(String key, String value) {
Expand Down
Loading

0 comments on commit 1a44831

Please sign in to comment.