Skip to content

Commit

Permalink
[Java]Add LakeSoulLocalJavaWriter (#550)
Browse files Browse the repository at this point in the history
* merge local_writer into merge_main

MR-title: add LakeSoulLocalJavaWriter
Created-by: hw_syl_zenghua
Author-id: 7155563
MR-id: 7272355
Commit-by: zenghua
Merged-by: hw_syl_zenghua
Description: merge "local_writer" into "merge_main"
add LakeSoulLocalJavaWriter

Signed-off-by: zenghua <[email protected]>

See merge request: 42b369588d84469d95d7b738fc58da8e/LakeSoul/for-nanhang!3

* fix pom

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: hw_syl_zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
3 people authored Oct 25, 2024
1 parent 199df38 commit de9c8fa
Show file tree
Hide file tree
Showing 37 changed files with 2,584 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@ public abstract class DBConfig {
public static String LAKESOUL_PARTITION_DESC_KV_DELIM = "=";

public static class TableInfoProperty {
public static String HASH_BUCKET_NUM = "hashBucketNum";
public static final String HASH_BUCKET_NUM = "hashBucketNum";

public static String DROPPED_COLUMN = "droppedColumn";
public static final String DROPPED_COLUMN = "droppedColumn";

public static String DROPPED_COLUMN_SPLITTER = ",";
public static final String DROPPED_COLUMN_SPLITTER = ",";

public static String LAST_TABLE_SCHEMA_CHANGE_TIME = "last_schema_change_time";
public static final String LAST_TABLE_SCHEMA_CHANGE_TIME = "last_schema_change_time";

public static final String USE_CDC = "use_cdc";

public static final String CDC_CHANGE_COLUMN = "lakesoul_cdc_change_column";

public static final String CDC_CHANGE_COLUMN_DEFAULT = "rowKinds";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public TableInfo getTableInfoByNameAndNamespace(String tableName, String namespa

public void createNewTable(String tableId, String namespace, String tableName, String tablePath, String tableSchema,
JSONObject properties, String partitions) {
properties.put(DBConfig.TableInfoProperty.LAST_TABLE_SCHEMA_CHANGE_TIME, String.valueOf(System.currentTimeMillis()));
createTable(tableId, namespace, tableName, tablePath, tableSchema, properties.toJSONString(), partitions);
}

public void createTable(String tableId, String namespace, String tableName, String tablePath, String tableSchema,
String properties, String partitions) {

TableInfo.Builder tableInfo = TableInfo.newBuilder();
tableInfo.setTableId(tableId);
Expand All @@ -108,8 +114,7 @@ public void createNewTable(String tableId, String namespace, String tableName, S
tableInfo.setTablePath(tablePath);
tableInfo.setTableSchema(tableSchema);
tableInfo.setPartitions(partitions);
properties.put(DBConfig.TableInfoProperty.LAST_TABLE_SCHEMA_CHANGE_TIME, String.valueOf(System.currentTimeMillis()));
tableInfo.setProperties(properties.toJSONString());
tableInfo.setProperties(properties);

String domain = getNameSpaceDomain(namespace);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,14 @@ public Map<String, List<PendingFileRecoverable>> closeForCommitWithRecoverableMa
if (this.batch.getRowCount() > 0) {
this.nativeWriter.write(this.batch);
}
HashMap<String, List<String>> partitionDescAndFilesMap = this.nativeWriter.flush();
for (Map.Entry<String, List<String>> entry : partitionDescAndFilesMap.entrySet()) {
HashMap<String, List<NativeIOWriter.FlushResult>> partitionDescAndFilesMap = this.nativeWriter.flush();
for (Map.Entry<String, List<NativeIOWriter.FlushResult>> entry : partitionDescAndFilesMap.entrySet()) {
String key = isDynamicBucket ? entry.getKey() : bucketID;
recoverableMap.put(
key,
entry.getValue()
.stream()
.map(path -> new NativeParquetWriter.NativeWriterPendingFileRecoverable(path,
.map(result -> new NativeParquetWriter.NativeWriterPendingFileRecoverable(result.getFilePath(),
creationTime))
.collect(Collectors.toList())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ public Map<String, List<PendingFileRecoverable>> closeForCommitWithRecoverableMa
long timer = System.currentTimeMillis();
Map<String, List<PendingFileRecoverable>> recoverableMap = new HashMap<>();

HashMap<String, List<String>> partitionDescAndFilesMap = this.nativeWriter.flush();
for (Map.Entry<String, List<String>> entry : partitionDescAndFilesMap.entrySet()) {
HashMap<String, List<NativeIOWriter.FlushResult>> partitionDescAndFilesMap = this.nativeWriter.flush();
for (Map.Entry<String, List<NativeIOWriter.FlushResult>> entry : partitionDescAndFilesMap.entrySet()) {
recoverableMap.put(
entry.getKey(),
entry.getValue()
.stream()
.map(path -> new NativeParquetWriter.NativeWriterPendingFileRecoverable(path, creationTime))
.map(result -> new NativeParquetWriter.NativeWriterPendingFileRecoverable(result.getFilePath(), creationTime))
.collect(Collectors.toList())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ abstract class DDLTestBase extends QueryTest with SQLTestUtils {

test("Call Statement") {
withTable("lakesoul_test") {
val call = spark.sessionState.sqlParser.parsePlan("CALL cat.system.func(c1 => 'name=name1', c2 => map('2',3), c3 => true,c4 => TIMESTAMP '2013-01-01',c5=>3L,c6=>1.0D,c7=>ARRAY(1,3))")
val s = call.asInstanceOf[CallStatement]
assert(s.args.length == 7)
val call = spark.sessionState.sqlParser.parsePlan("CALL cat.system.func(c1 => 'name=name1', c2 => map('2',3), c3 => true,c4 => TIMESTAMP '2013-01-01',c5=>3L,c6=>1.0D,c7=>ARRAY(1,3))")
val s = call.asInstanceOf[CallStatement]
assert(s.args.length == 7)
}
}

Expand Down Expand Up @@ -485,4 +485,8 @@ abstract class DDLTestBase extends QueryTest with SQLTestUtils {
}
}
}

// test("read test table") {
// sql("select * from test_local_java_table where range=2").show(1000)
// }
}
4 changes: 0 additions & 4 deletions native-io/lakesoul-io-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ SPDX-License-Identifier: Apache-2.0
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.dmetasoul.lakesoul.lakesoul;

import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

import java.util.ArrayList;
import java.util.List;

public class LakeSoulArrowUtils {
public static Schema cdcColumnAlignment(Schema schema, String cdcColumn) {
if (cdcColumn != null) {
// set cdc column as the last field
Field cdcField = null;
List<Field> fields = new ArrayList<>(schema.getFields().size() + 1);
for (Field field : schema.getFields()) {
if (!field.getName().equals(cdcColumn)) {
fields.add(field);
} else {
cdcField = field;
}
}
if (cdcField != null) {
fields.add(cdcField);
} else {
throw new RuntimeException(String.format("Invalid Schema of %s, CDC Column [%s] not found", schema, cdcColumn));
// fields.add(new Field(cdcColumn, FieldType.notNullable(new ArrowType.Utf8()), null));
}
return new Schema(fields);
}
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,29 @@

package com.dmetasoul.lakesoul.lakesoul.io;

import jnr.ffi.Memory;
import com.dmetasoul.lakesoul.lakesoul.LakeSoulArrowUtils;
import com.dmetasoul.lakesoul.meta.DBConfig;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jnr.ffi.Pointer;
import jnr.ffi.Runtime;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import com.dmetasoul.lakesoul.lakesoul.io.jnr.LibLakeSoulIO;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.pojo.Schema;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static com.dmetasoul.lakesoul.meta.DBConfig.TableInfoProperty.HASH_BUCKET_NUM;

public class NativeIOWriter extends NativeIOBase implements AutoCloseable {

Expand All @@ -33,6 +37,34 @@ public NativeIOWriter(Schema schema) {
setSchema(schema);
}

public NativeIOWriter(TableInfo tableInfo) {
super("NativeWriter");

String cdcColumn;
try {
ObjectMapper mapper = new ObjectMapper();
Map<String, String> properties = mapper.readValue(tableInfo.getProperties(), Map.class);
setHashBucketNum(Integer.parseInt(properties.get(HASH_BUCKET_NUM)));
cdcColumn = properties.get(DBConfig.TableInfoProperty.CDC_CHANGE_COLUMN);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
try {
Schema schema = Schema.fromJSON(tableInfo.getTableSchema());
setSchema(LakeSoulArrowUtils.cdcColumnAlignment(schema, cdcColumn));
} catch (IOException e) {
throw new RuntimeException(e);
}
DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
setPrimaryKeys(partitionKeys.primaryKeys);
setRangePartitions(partitionKeys.rangeKeys);
useDynamicPartition(true);


withPrefix(tableInfo.getTablePath());

}


public void setAuxSortColumns(Iterable<String> auxSortColumns) {
for (String col : auxSortColumns) {
Expand Down Expand Up @@ -100,7 +132,6 @@ public int writeIpc(byte[] encodedBatch) throws IOException {
}

public void write(VectorSchemaRoot batch) throws IOException {
System.out.println("writing batch: " + batch.getRowCount());
ArrowArray array = ArrowArray.allocateNew(allocator);
ArrowSchema schema = ArrowSchema.allocateNew(allocator);
Data.exportVectorSchemaRoot(allocator, batch, provider, array, schema);
Expand All @@ -112,7 +143,39 @@ public void write(VectorSchemaRoot batch) throws IOException {
}
}

public HashMap<String, List<String>> flush() throws IOException {
public static class FlushResult {
final String filePath;
final Long fileSize;

final String fileExistCols;

FlushResult(String filePath, Long fileSize, String fileExistCols) {
this.filePath = filePath;
this.fileSize = fileSize;
this.fileExistCols = fileExistCols;
}

public Long getFileSize() {
return fileSize;
}

public String getFilePath() {
return filePath;
}

public String getFileExistCols() {
return fileExistCols;
}
}

public static FlushResult decodeFlushResult(String encoded) {
String[] fields = encoded.split("\u0003");

Preconditions.checkArgument(fields.length == 3);
return new FlushResult(fields[0], Long.parseLong(fields[1]), fields[2]);
}

public HashMap<String, List<FlushResult>> flush() throws IOException {
AtomicReference<String> errMsg = new AtomicReference<>();
AtomicReference<Integer> lenResult = new AtomicReference<>();
IntegerCallback nativeIntegerCallback = new IntegerCallback((len, err) -> {
Expand Down Expand Up @@ -158,11 +221,12 @@ public HashMap<String, List<String>> flush() throws IOException {
if (partitionNum != splits.length - 1) {
throw new IOException("Dynamic Partitions Result [" + decodedResult + "] encode error: partition number mismatch " + partitionNum + "!=" + (splits.length - 1));
}
HashMap<String, List<String>> partitionDescAndFilesMap = new HashMap<>();
HashMap<String, List<FlushResult>> partitionDescAndFilesMap = new HashMap<>();
for (int i = 1; i < splits.length; i++) {
String[] partitionDescAndFiles = splits[i].split("\u0002");
List<String> list = new ArrayList<>(Arrays.asList(partitionDescAndFiles).subList(1, partitionDescAndFiles.length));
partitionDescAndFilesMap.put(partitionDescAndFiles[0], list);
List<FlushResult> result = list.stream().map(NativeIOWriter::decodeFlushResult).collect(Collectors.toList());
partitionDescAndFilesMap.put(partitionDescAndFiles[0], result);

}
return partitionDescAndFilesMap;
Expand Down
Loading

0 comments on commit de9c8fa

Please sign in to comment.