Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43902: [Java] Support for Long memory addresses #43903

Merged
merged 19 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
vibhatha marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,44 @@ public interface AllocationReservation extends AutoCloseable {
* @param nBytes the number of bytes to add
* @return true if the addition is possible, false otherwise
* @throws IllegalStateException if called after buffer() is used to allocate the reservation
* @deprecated use {@link #add(long)} instead
*/
@Deprecated(forRemoval = true)
boolean add(int nBytes);

/**
* Add to the current reservation.
*
* <p>Adding may fail if the allocator is not allowed to consume any more space.
*
* @param nBytes the number of bytes to add
* @return true if the addition is possible, false otherwise
* @throws IllegalStateException if called after buffer() is used to allocate the reservation
*/
boolean add(long nBytes);

/**
* Requests a reservation of additional space.
*
* <p>The implementation of the allocator's inner class provides this.
*
* @param nBytes the amount to reserve
* @return true if the reservation can be satisfied, false otherwise
* @deprecated use {@link #reserve(long)} instead
*/
@Deprecated(forRemoval = true)
boolean reserve(int nBytes);

/**
* Requests a reservation of additional space.
*
* <p>The implementation of the allocator's inner class provides this.
*
* @param nBytes the amount to reserve
* @return true if the reservation can be satisfied, false otherwise
*/
boolean reserve(long nBytes);

/**
* Allocate a buffer whose size is the total of all the add()s made.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.arrow.memory.util.AssertionUtil;
import org.apache.arrow.memory.util.CommonUtil;
import org.apache.arrow.memory.util.HistoricalLog;
import org.apache.arrow.memory.util.LargeMemoryUtil;
import org.apache.arrow.util.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyFor;
Expand Down Expand Up @@ -860,7 +861,7 @@ RoundingPolicy getRoundingPolicy() {
public class Reservation implements AllocationReservation {

private final @Nullable HistoricalLog historicalLog;
private int nBytes = 0;
private long nBytes = 0;
private boolean used = false;
private boolean closed = false;

Expand Down Expand Up @@ -888,8 +889,14 @@ public Reservation() {
}
}

@Deprecated(forRemoval = true)
@Override
public boolean add(final int nBytes) {
return add((long) nBytes);
}

@Override
public boolean add(final long nBytes) {
assertOpen();

Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
Expand All @@ -906,7 +913,7 @@ public boolean add(final int nBytes) {
// modifying this behavior so that we maintain what we reserve and what the user asked for
// and make sure to only
// round to power of two as necessary.
final int nBytesTwo = CommonUtil.nextPowerOfTwo(nBytes);
final long nBytesTwo = CommonUtil.nextPowerOfTwo(nBytes);
if (!reserve(nBytesTwo)) {
return false;
}
Expand All @@ -929,7 +936,7 @@ public ArrowBuf allocateBuffer() {

@Override
public int getSize() {
return nBytes;
return LargeMemoryUtil.checkedCastToInt(nBytes);
}

@Override
Expand Down Expand Up @@ -978,8 +985,14 @@ public void close() {
closed = true;
}

@Deprecated(forRemoval = true)
@Override
public boolean reserve(int nBytes) {
vibhatha marked this conversation as resolved.
Show resolved Hide resolved
return reserve((long) nBytes);
}

@Override
public boolean reserve(long nBytes) {
assertOpen();

final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes);
Expand All @@ -999,7 +1012,7 @@ public boolean reserve(int nBytes) {
* @param nBytes the size of the buffer requested
* @return the buffer, or null, if the request cannot be satisfied
*/
private ArrowBuf allocate(int nBytes) {
private ArrowBuf allocate(long nBytes) {
assertOpen();

boolean success = false;
Expand Down Expand Up @@ -1033,7 +1046,7 @@ private ArrowBuf allocate(int nBytes) {
*
* @param nBytes the size of the reservation
*/
private void releaseReservation(int nBytes) {
private void releaseReservation(long nBytes) {
assertOpen();

releaseBytes(nBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ public class DefaultRoundingPolicy implements RoundingPolicy {
*
* <p>It was copied from {@link io.netty.buffer.PooledByteBufAllocator}.
*/
private static final int MIN_PAGE_SIZE = 4096;
private static final long MIN_PAGE_SIZE = 4096;

private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
private static final long MAX_CHUNK_SIZE = Long.MAX_VALUE / 2;
vibhatha marked this conversation as resolved.
Show resolved Hide resolved
private static final long DEFAULT_CHUNK_SIZE;

static {
int defaultPageSize = Integer.getInteger("org.apache.memory.allocator.pageSize", 8192);
long defaultPageSize = Long.getLong("org.apache.memory.allocator.pageSize", 8192);
try {
validateAndCalculatePageShifts(defaultPageSize);
} catch (Throwable t) {
Expand All @@ -60,7 +60,7 @@ public class DefaultRoundingPolicy implements RoundingPolicy {
}
}

private static int validateAndCalculatePageShifts(int pageSize) {
private static long validateAndCalculatePageShifts(long pageSize) {
if (pageSize < MIN_PAGE_SIZE) {
throw new IllegalArgumentException(
"pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ")");
Expand All @@ -71,17 +71,17 @@ private static int validateAndCalculatePageShifts(int pageSize) {
}

// Logarithm base 2. At this point we know that pageSize is a power of two.
return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize);
return Long.SIZE - 1 - Long.numberOfLeadingZeros(pageSize);
}

private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) {
private static long validateAndCalculateChunkSize(long pageSize, int maxOrder) {
if (maxOrder > 14) {
throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)");
}

// Ensure the resulting chunkSize does not overflow.
int chunkSize = pageSize;
for (int i = maxOrder; i > 0; i--) {
long chunkSize = pageSize;
for (long i = maxOrder; i > 0; i--) {
if (chunkSize > MAX_CHUNK_SIZE / 2) {
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.arrow.memory.rounding;

import org.apache.arrow.memory.util.LargeMemoryUtil;
import org.apache.arrow.util.Preconditions;

/** The rounding policy that each buffer size must a multiple of the segment size. */
Expand All @@ -28,16 +29,29 @@ public class SegmentRoundingPolicy implements RoundingPolicy {
* The segment size. It must be at least {@link SegmentRoundingPolicy#MIN_SEGMENT_SIZE}, and be a
* power of 2.
*/
private int segmentSize;
private long segmentSize;

/**
* Constructor for the segment rounding policy.
*
* @param segmentSize the segment size.
* @throws IllegalArgumentException if the segment size is smaller than {@link
* SegmentRoundingPolicy#MIN_SEGMENT_SIZE}, or is not a power of 2.
* @deprecated use {@link SegmentRoundingPolicy#SegmentRoundingPolicy(long)} instead.
*/
@Deprecated(forRemoval = true)
public SegmentRoundingPolicy(int segmentSize) {
vibhatha marked this conversation as resolved.
Show resolved Hide resolved
this((long) segmentSize);
}

/**
* Constructor for the segment rounding policy.
*
* @param segmentSize the segment size.
* @throws IllegalArgumentException if the segment size is smaller than {@link
* SegmentRoundingPolicy#MIN_SEGMENT_SIZE}, or is not a power of 2.
*/
public SegmentRoundingPolicy(long segmentSize) {
Preconditions.checkArgument(
segmentSize >= MIN_SEGMENT_SIZE,
"The segment size cannot be smaller than %s",
Expand All @@ -53,6 +67,10 @@ public long getRoundedSize(long requestSize) {
}

public int getSegmentSize() {
vibhatha marked this conversation as resolved.
Show resolved Hide resolved
return LargeMemoryUtil.checkedCastToInt(segmentSize);
}

public long getSegmentSizeAsLong() {
return segmentSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public void testRootAllocator_createChildDontClose() throws Exception {

@Test
public void testSegmentAllocator() {
RoundingPolicy policy = new SegmentRoundingPolicy(1024);
RoundingPolicy policy = new SegmentRoundingPolicy(1024L);
try (RootAllocator allocator =
new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy)) {
ArrowBuf buf = allocator.buffer(798);
Expand All @@ -334,7 +334,7 @@ public void testSegmentAllocator() {

@Test
public void testSegmentAllocator_childAllocator() {
RoundingPolicy policy = new SegmentRoundingPolicy(1024);
RoundingPolicy policy = new SegmentRoundingPolicy(1024L);
try (RootAllocator allocator = new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy);
BufferAllocator childAllocator = allocator.newChildAllocator("child", 0, 512 * 1024)) {

Expand All @@ -357,14 +357,14 @@ public void testSegmentAllocator_childAllocator() {
@Test
public void testSegmentAllocator_smallSegment() {
IllegalArgumentException e =
assertThrows(IllegalArgumentException.class, () -> new SegmentRoundingPolicy(128));
assertThrows(IllegalArgumentException.class, () -> new SegmentRoundingPolicy(128L));
assertEquals("The segment size cannot be smaller than 1024", e.getMessage());
}

@Test
public void testSegmentAllocator_segmentSizeNotPowerOf2() {
IllegalArgumentException e =
assertThrows(IllegalArgumentException.class, () -> new SegmentRoundingPolicy(4097));
assertThrows(IllegalArgumentException.class, () -> new SegmentRoundingPolicy(4097L));
assertEquals("The segment size must be a power of 2", e.getMessage());
}

Expand Down Expand Up @@ -957,7 +957,7 @@ public void testAllocator_unclaimedReservation() throws Exception {
try (final BufferAllocator childAllocator1 =
rootAllocator.newChildAllocator("unclaimedReservation", 0, MAX_ALLOCATION)) {
try (final AllocationReservation reservation = childAllocator1.newReservation()) {
assertTrue(reservation.add(64));
assertTrue(reservation.add(64L));
}
rootAllocator.verify();
}
Expand All @@ -972,8 +972,8 @@ public void testAllocator_claimedReservation() throws Exception {
rootAllocator.newChildAllocator("claimedReservation", 0, MAX_ALLOCATION)) {

try (final AllocationReservation reservation = childAllocator1.newReservation()) {
assertTrue(reservation.add(32));
assertTrue(reservation.add(32));
assertTrue(reservation.add(32L));
assertTrue(reservation.add(32L));

final ArrowBuf arrowBuf = reservation.allocateBuffer();
assertEquals(64, arrowBuf.capacity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class NettyArrowBuf extends AbstractByteBuf implements AutoCloseable {

private final ArrowBuf arrowBuf;
private final ArrowByteBufAllocator arrowByteBufAllocator;
private int length;
private long length;
private final long address;

/**
Expand All @@ -47,10 +47,24 @@ public class NettyArrowBuf extends AbstractByteBuf implements AutoCloseable {
* @param arrowBuf The buffer to wrap.
* @param bufferAllocator The allocator for the buffer.
* @param length The length of this buffer.
* @deprecated Use {@link #NettyArrowBuf(ArrowBuf, BufferAllocator, long)} instead.
*/
@Deprecated(forRemoval = true)
public NettyArrowBuf(
vibhatha marked this conversation as resolved.
Show resolved Hide resolved
final ArrowBuf arrowBuf, final BufferAllocator bufferAllocator, final int length) {
super(length);
this(arrowBuf, bufferAllocator, (long) length);
}

/**
* Constructs a new instance.
*
* @param arrowBuf The buffer to wrap.
* @param bufferAllocator The allocator for the buffer.
* @param length The length of this buffer.
*/
public NettyArrowBuf(
final ArrowBuf arrowBuf, final BufferAllocator bufferAllocator, final long length) {
super((int) length);
this.arrowBuf = arrowBuf;
this.arrowByteBufAllocator = new ArrowByteBufAllocator(bufferAllocator);
this.length = length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public void defaultAllocatorBenchmark() {
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void segmentRoundingPolicyBenchmark() {
final int bufferSize = 1024;
final long bufferSize = 1024L;
final int numBuffers = 1024;
final int segmentSize = 1024;
final long segmentSize = 1024L;

RoundingPolicy policy = new SegmentRoundingPolicy(segmentSize);
try (RootAllocator allocator =
Expand Down
Loading