Skip to content

Commit

Permalink
fix up
Browse files Browse the repository at this point in the history
  • Loading branch information
czy006 committed Dec 10, 2024
1 parent eb0882f commit e90424b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

package org.apache.amoro.utils.map;

import org.apache.amoro.serialization.JavaSerializer;
import org.apache.amoro.serialization.ResourceSerde;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.utils.SerializationUtil;

import javax.annotation.Nullable;

Expand All @@ -42,9 +43,9 @@ public class SimpleSpillableMap<K, T> implements SimpleMap<K, T> {
private long estimatedPayloadSize = 0;
private int putCount = 0;

private final SerializationUtil.SimpleSerializer<K> keySerializer;
private final ResourceSerde<K> keySerializer;

private final SerializationUtil.SimpleSerializer<T> valueSerializer;
private final ResourceSerde<T> valueSerializer;

protected SimpleSpillableMap(
Long maxInMemorySizeInBytes,
Expand All @@ -54,17 +55,17 @@ protected SimpleSpillableMap(
this(
maxInMemorySizeInBytes,
backendBaseDir,
SerializationUtil.JavaSerializer.INSTANT,
SerializationUtil.JavaSerializer.INSTANT,
JavaSerializer.INSTANT,
JavaSerializer.INSTANT,
keySizeEstimator,
valueSizeEstimator);
}

protected SimpleSpillableMap(
Long maxInMemorySizeInBytes,
@Nullable String backendBaseDir,
SerializationUtil.SimpleSerializer<K> keySerializer,
SerializationUtil.SimpleSerializer<T> valueSerializer,
ResourceSerde<K> keySerializer,
ResourceSerde<T> valueSerializer,
SizeEstimator<K> keySizeEstimator,
SizeEstimator<T> valueSizeEstimator) {
this.memoryMap = Maps.newHashMap();
Expand Down Expand Up @@ -153,13 +154,13 @@ protected class SimpleSpilledMap<K, T> implements SimpleMap<K, T> {

private final String columnFamily = UUID.randomUUID().toString();

private final SerializationUtil.SimpleSerializer<K> keySerializer;
private final ResourceSerde<K> keySerializer;

private final SerializationUtil.SimpleSerializer<T> valueSerializer;
private final ResourceSerde<T> valueSerializer;

public SimpleSpilledMap(
SerializationUtil.SimpleSerializer<K> keySerializer,
SerializationUtil.SimpleSerializer<T> valueSerializer,
ResourceSerde<K> keySerializer,
ResourceSerde<T> valueSerializer,
@Nullable String backendBaseDir) {
rocksDB = RocksDBBackend.getOrCreateInstance(backendBaseDir);
rocksDB.addColumnFamily(columnFamily);
Expand All @@ -168,19 +169,24 @@ public SimpleSpilledMap(
}

public boolean containsKey(K key) {
return rocksDB.get(columnFamily, keySerializer.serialize(key)) != null;
return rocksDB.get(columnFamily, keySerializer.serializeResource(key)) != null;
}

public T get(K key) {
return valueSerializer.deserialize(rocksDB.get(columnFamily, keySerializer.serialize(key)));
return valueSerializer
.deserializeResource(rocksDB.get(columnFamily, keySerializer.serializeResource(key)))
.getResource();
}

public void put(K key, T value) {
rocksDB.put(columnFamily, keySerializer.serialize(key), valueSerializer.serialize(value));
rocksDB.put(
columnFamily,
keySerializer.serializeResource(key),
valueSerializer.serializeResource(value));
}

public void delete(K key) {
rocksDB.delete(columnFamily, keySerializer.serialize(key));
rocksDB.delete(columnFamily, keySerializer.serializeResource(key));
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import static org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;

import org.apache.amoro.serialization.ResourceSerde;
import org.apache.amoro.utils.SerializationUtil;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeWrapper;

public class StructLikeWrapperSerializer
implements SerializationUtil.SimpleSerializer<StructLikeWrapper> {
public class StructLikeWrapperSerializer implements ResourceSerde<StructLikeWrapper> {

protected final StructLikeWrapper structLikeWrapper;

Expand All @@ -39,19 +39,19 @@ public StructLikeWrapperSerializer(Types.StructType type) {
}

@Override
public byte[] serialize(StructLikeWrapper structLikeWrapper) {
public byte[] serializeResource(StructLikeWrapper structLikeWrapper) {
checkNotNull(structLikeWrapper);
StructLike copy = StructLikeCopy.copy(structLikeWrapper.get());
return SerializationUtil.kryoSerialize(copy);
}

@Override
public StructLikeWrapper deserialize(byte[] bytes) {
public DeserializedResource<StructLikeWrapper> deserializeResource(byte[] bytes) {
if (bytes == null) {
return null;
}
StructLikeCopy structLike = SerializationUtil.kryoDeserialize(bytes);
return structLikeWrapper.copyFor(structLike);
return new DeserializedResource<>(structLikeWrapper.copyFor(structLike), false);
}

public static class StructLikeCopy implements StructLike {
Expand Down

0 comments on commit e90424b

Please sign in to comment.