diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java index 314f137893..123a9dd12c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java @@ -29,6 +29,7 @@ import org.apache.amoro.shade.guava32.com.google.common.base.Suppliers; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.amoro.utils.MemorySize; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.rest.RestClusterClient; @@ -228,9 +229,11 @@ protected String buildOptimizerStartupArgsString(Resource resource) { properties, resourceFlinkConf, FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY); resourceFlinkConf.putToOptions( - FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, jobManagerMemory + "m"); + FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, + MemorySize.ofMebiBytes(jobManagerMemory).toString()); resourceFlinkConf.putToOptions( - FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY, taskManagerMemory + "m"); + FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY, + MemorySize.ofMebiBytes(taskManagerMemory).toString()); resourceFlinkConf.putToOptions( FlinkConfKeys.YARN_APPLICATION_JOB_NAME, String.join( @@ -354,28 +357,8 @@ protected long parseMemorySize(String memoryStr) { if (memoryStr == null || memoryStr.isEmpty()) { return 0; } - memoryStr = memoryStr.toLowerCase().trim().replaceAll("\\s", ""); - Matcher matcher = Pattern.compile("(\\d+)([mg])").matcher(memoryStr); - if (matcher.matches()) { - long size = Long.parseLong(matcher.group(1)); - String unit = matcher.group(2); - switch (unit) { - case "m": - return size; - case "g": - return size * 1024; - default: - LOG.error("Invalid memory size unit: {}, Please use m or g as the unit", unit); - return 0; - } - } else { - try { - return Long.parseLong(memoryStr); - } catch (NumberFormatException e) { - LOG.error("Invalid memory size format: {}", memoryStr); - return 0; - } - } + + return MemorySize.parse(memoryStr).getMebiBytes(); } private T fetchCommandOutput(Process exec, Function commandReader) { diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql b/amoro-ams/src/main/resources/mysql/upgrade.sql index f632c01c3d..774a9b0b56 100644 --- a/amoro-ams/src/main/resources/mysql/upgrade.sql +++ b/amoro-ams/src/main/resources/mysql/upgrade.sql @@ -67,3 +67,11 @@ CREATE TABLE `http_session` ( PRIMARY KEY(`session_id`, `context_path`, `virtual_host`), KEY `idx_session_expiry` (`expiry_time`) ) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Http session store' ROW_FORMAT=DYNAMIC; + +-- update resource group memory unit +update resource_group set properties = JSON_SET(properties, '$."flink-conf.jobmanager.memory.process.size"', CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.jobmanager.memory.process.size"')), 'MB')) WHERE JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.jobmanager.memory.process.size"')) REGEXP '^[0-9]+$'; +update resource_group set properties = JSON_SET(properties, '$."flink-conf.taskmanager.memory.process.size"', CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.taskmanager.memory.process.size"')), 'MB')) WHERE JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.taskmanager.memory.process.size"')) REGEXP '^[0-9]+$'; + +-- update resource memory unit +update resource set properties = JSON_SET(properties, '$."flink-conf.jobmanager.memory.process.size"', CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.jobmanager.memory.process.size"')), 'MB')) WHERE JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.jobmanager.memory.process.size"')) REGEXP '^[0-9]+$'; +update resource set properties = JSON_SET(properties, '$."flink-conf.taskmanager.memory.process.size"', CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.taskmanager.memory.process.size"')), 'MB')) WHERE JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.taskmanager.memory.process.size"')) REGEXP '^[0-9]+$'; diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql b/amoro-ams/src/main/resources/postgres/upgrade.sql index 225cdbd49e..5d43874221 100644 --- a/amoro-ams/src/main/resources/postgres/upgrade.sql +++ b/amoro-ams/src/main/resources/postgres/upgrade.sql @@ -84,4 +84,38 @@ COMMENT ON COLUMN http_session.last_save_time IS 'Last save time'; COMMENT ON COLUMN http_session.expiry_time IS 'Expiry time'; COMMENT ON COLUMN http_session.max_interval IS 'Max internal'; COMMENT ON COLUMN http_session.data_store IS 'Session data store'; -COMMENT ON TABLE http_session IS 'Http session store'; \ No newline at end of file +COMMENT ON TABLE http_session IS 'Http session store'; + + -- update resource group memory unit +UPDATE resource_group +SET properties = jsonb_set( + properties, + '{flink-conf,jobmanager,memory,process,size}', + (properties->>'flink-conf.jobmanager.memory.process.size') || 'MB' + ) +WHERE (properties->>'flink-conf.jobmanager.memory.process.size') ~ '^[0-9]+$'; + +UPDATE resource_group +SET properties = jsonb_set( + properties, + '{flink-conf,taskmanager,memory,process,size}', + (properties->>'flink-conf.taskmanager.memory.process.size') || 'MB' + ) +WHERE (properties->>'flink-conf.taskmanager.memory.process.size') ~ '^[0-9]+$'; + +-- update resource memory unit +UPDATE resource +SET properties = jsonb_set( + properties, + '{flink-conf,jobmanager,memory,process,size}', + (properties->>'flink-conf.jobmanager.memory.process.size') || 'MB' + ) +WHERE (properties->>'flink-conf.jobmanager.memory.process.size') ~ '^[0-9]+$'; + +UPDATE resource +SET properties = jsonb_set( + properties, + '{flink-conf,taskmanager,memory,process,size}', + (properties->>'flink-conf.taskmanager.memory.process.size') || 'MB' + ) +WHERE (properties->>'flink-conf.taskmanager.memory.process.size') ~ '^[0-9]+$'; \ No newline at end of file diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java index 0862968b49..7e0e03d4a1 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java @@ -42,17 +42,36 @@ public TestFlinkOptimizerContainer() { @Test public void testParseMemorySize() { - Assert.assertEquals(100, container.parseMemorySize("100")); + Assert.assertEquals(0, container.parseMemorySize("100")); + Assert.assertEquals(0, container.parseMemorySize("100k")); + Assert.assertEquals(0, container.parseMemorySize("100kb")); + Assert.assertEquals(0, container.parseMemorySize("100 k")); + Assert.assertEquals(0, container.parseMemorySize("100 kb")); + Assert.assertEquals(0, container.parseMemorySize("100K")); + Assert.assertEquals(0, container.parseMemorySize("100KB")); + Assert.assertEquals(0, container.parseMemorySize("100 K")); + Assert.assertEquals(0, container.parseMemorySize("100 KB")); Assert.assertEquals(100, container.parseMemorySize("100m")); + Assert.assertEquals(100, container.parseMemorySize("100mb")); Assert.assertEquals(100, container.parseMemorySize("100 m")); + Assert.assertEquals(100, container.parseMemorySize("100 mb")); Assert.assertEquals(100, container.parseMemorySize("100M")); + Assert.assertEquals(100, container.parseMemorySize("100MB")); Assert.assertEquals(100, container.parseMemorySize("100 M")); + Assert.assertEquals(100, container.parseMemorySize("100 MB")); Assert.assertEquals(102400, container.parseMemorySize("100g")); + Assert.assertEquals(102400, container.parseMemorySize("100gb")); Assert.assertEquals(102400, container.parseMemorySize("100 g")); + Assert.assertEquals(102400, container.parseMemorySize("100 gb")); Assert.assertEquals(102400, container.parseMemorySize("100G")); + Assert.assertEquals(102400, container.parseMemorySize("100GB")); Assert.assertEquals(102400, container.parseMemorySize("100 G")); - Assert.assertEquals(0, container.parseMemorySize("G100G")); - Assert.assertEquals(0, container.parseMemorySize("100kb")); + Assert.assertEquals(102400, container.parseMemorySize("100 GB")); + try { + Assert.assertEquals(0, container.parseMemorySize("G100G")); + } catch (NumberFormatException e) { + Assert.assertEquals("text does not start with a number", e.getMessage()); + } } @Test @@ -110,11 +129,11 @@ public void testGetMemorySizeValue() { FlinkOptimizerContainer.FlinkConf.buildFor(prop, Maps.newHashMap()).build(); Assert.assertEquals( - 100L, + 0, container.getMemorySizeValue( prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY)); Assert.assertEquals( - 100L, + 0, container.getMemorySizeValue( prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY)); @@ -124,7 +143,7 @@ public void testGetMemorySizeValue() { conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop, containerProperties).build(); prop.clear(); Assert.assertEquals( - 200L, + 0, container.getMemorySizeValue( prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY)); Assert.assertEquals( @@ -143,7 +162,7 @@ public void testGetMemorySizeValue() { container.getMemorySizeValue( prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY)); Assert.assertEquals( - 300L, + 0, container.getMemorySizeValue( prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY)); @@ -157,5 +176,17 @@ public void testGetMemorySizeValue() { 0L, container.getMemorySizeValue( prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY)); + + prop.put(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, "400 MB"); + prop.put(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY, "400 MB"); + conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop, Maps.newHashMap()).build(); + Assert.assertEquals( + 400L, + container.getMemorySizeValue( + prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY)); + Assert.assertEquals( + 400L, + container.getMemorySizeValue( + prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY)); } } diff --git a/amoro-common/src/main/java/org/apache/amoro/utils/MemorySize.java b/amoro-common/src/main/java/org/apache/amoro/utils/MemorySize.java new file mode 100644 index 0000000000..dcfb4583c5 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/utils/MemorySize.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.utils; + +import static org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkArgument; +import static org.apache.amoro.utils.MemorySize.MemoryUnit.BYTES; +import static org.apache.amoro.utils.MemorySize.MemoryUnit.GIGA_BYTES; +import static org.apache.amoro.utils.MemorySize.MemoryUnit.KILO_BYTES; +import static org.apache.amoro.utils.MemorySize.MemoryUnit.MEGA_BYTES; +import static org.apache.amoro.utils.MemorySize.MemoryUnit.TERA_BYTES; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.stream.IntStream; + +/** + * MemorySize is a representation of a number of bytes, viewable in different units. Copied from + * Apache Flink. + * + *

Parsing

+ * + *

The size can be parsed from a text expression. If the expression is a pure number, the value + * will be interpreted as bytes. + */ +public class MemorySize implements java.io.Serializable, Comparable { + + private static final long serialVersionUID = 1L; + public static final MemorySize ZERO = new MemorySize(0L); + + public static final MemorySize MAX_VALUE = new MemorySize(Long.MAX_VALUE); + private static final List ORDERED_UNITS = + Arrays.asList(BYTES, KILO_BYTES, MEGA_BYTES, GIGA_BYTES, TERA_BYTES); + + // ------------------------------------------------------------------------ + + /** The memory size, in bytes. */ + private final long bytes; + + /** The memorized value returned by toString(). */ + private transient String stringified; + + /** The memorized value returned by toHumanReadableString(). */ + private transient String humanReadableStr; + + /** + * Constructs a new MemorySize. + * + * @param bytes The size, in bytes. Must be zero or larger. + */ + public MemorySize(long bytes) { + checkArgument(bytes >= 0, "bytes must be >= 0"); + this.bytes = bytes; + } + + public static MemorySize ofMebiBytes(long mebiBytes) { + return new MemorySize(mebiBytes << 20); + } + + // ------------------------------------------------------------------------ + + /** Gets the memory size in bytes. */ + public long getBytes() { + return bytes; + } + + /** Gets the memory size in Kibibytes (= 1024 bytes). */ + public long getKibiBytes() { + return bytes >> 10; + } + + /** Gets the memory size in Mebibytes (= 1024 Kibibytes). */ + public int getMebiBytes() { + return (int) (bytes >> 20); + } + + /** Gets the memory size in Gibibytes (= 1024 Mebibytes). */ + public long getGibiBytes() { + return bytes >> 30; + } + + /** Gets the memory size in Tebibytes (= 1024 Gibibytes). */ + public long getTebiBytes() { + return bytes >> 40; + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return (int) (bytes ^ (bytes >>> 32)); + } + + @Override + public boolean equals(Object obj) { + return obj == this + || (obj != null + && obj.getClass() == this.getClass() + && ((MemorySize) obj).bytes == this.bytes); + } + + @Override + public String toString() { + if (stringified == null) { + stringified = formatToString(); + } + + return stringified; + } + + private String formatToString() { + MemoryUnit highestIntegerUnit = + IntStream.range(0, ORDERED_UNITS.size()) + .sequential() + .filter(idx -> bytes % ORDERED_UNITS.get(idx).getMultiplier() != 0) + .boxed() + .findFirst() + .map( + idx -> { + if (idx == 0) { + return ORDERED_UNITS.get(0); + } else { + return ORDERED_UNITS.get(idx - 1); + } + }) + .orElse(BYTES); + + return String.format( + "%d %s", bytes / highestIntegerUnit.getMultiplier(), highestIntegerUnit.getUnits()[1]); + } + + public String toHumanReadableString() { + if (humanReadableStr == null) { + humanReadableStr = formatToHumanReadableString(); + } + + return humanReadableStr; + } + + private String formatToHumanReadableString() { + MemoryUnit highestUnit = + IntStream.range(0, ORDERED_UNITS.size()) + .sequential() + .filter(idx -> bytes > ORDERED_UNITS.get(idx).getMultiplier()) + .boxed() + .max(Comparator.naturalOrder()) + .map(ORDERED_UNITS::get) + .orElse(BYTES); + + if (highestUnit == BYTES) { + return String.format("%d %s", bytes, BYTES.getUnits()[1]); + } else { + double approximate = 1.0 * bytes / highestUnit.getMultiplier(); + return String.format( + Locale.ROOT, "%.3f%s (%d bytes)", approximate, highestUnit.getUnits()[1], bytes); + } + } + + @Override + public int compareTo(MemorySize that) { + return Long.compare(this.bytes, that.bytes); + } + + // ------------------------------------------------------------------------ + // Calculations + // ------------------------------------------------------------------------ + + public MemorySize add(MemorySize that) { + return new MemorySize(Math.addExact(this.bytes, that.bytes)); + } + + public MemorySize subtract(MemorySize that) { + return new MemorySize(Math.subtractExact(this.bytes, that.bytes)); + } + + public MemorySize multiply(double multiplier) { + checkArgument(multiplier >= 0, "multiplier must be >= 0"); + + BigDecimal product = BigDecimal.valueOf(this.bytes).multiply(BigDecimal.valueOf(multiplier)); + if (product.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) { + throw new ArithmeticException("long overflow"); + } + return new MemorySize(product.longValue()); + } + + public MemorySize divide(long by) { + checkArgument(by >= 0, "divisor must be >= 0"); + return new MemorySize(bytes / by); + } + + // ------------------------------------------------------------------------ + // Parsing + // ------------------------------------------------------------------------ + + /** + * Parses the given string as as MemorySize. + * + * @param text The string to parse + * @return The parsed MemorySize + * @throws IllegalArgumentException Thrown, if the expression cannot be parsed. + */ + public static MemorySize parse(String text) throws IllegalArgumentException { + return new MemorySize(parseBytes(text)); + } + + /** + * Parses the given string with a default unit. + * + * @param text The string to parse. + * @param defaultUnit specify the default unit. + * @return The parsed MemorySize. + * @throws IllegalArgumentException Thrown, if the expression cannot be parsed. + */ + public static MemorySize parse(String text, MemoryUnit defaultUnit) + throws IllegalArgumentException { + if (!MemoryUnit.hasUnit(text)) { + return parse(text + defaultUnit.getUnits()[0]); + } + + return parse(text); + } + + /** + * Parses the given string as bytes. The supported expressions are listed under {@link + * MemorySize}. + * + * @param text The string to parse + * @return The parsed size, in bytes. + * @throws IllegalArgumentException Thrown, if the expression cannot be parsed. + */ + public static long parseBytes(String text) throws IllegalArgumentException { + checkArgument(text != null, "text can't be null"); + + final String trimmed = text.trim(); + checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string"); + + final int len = trimmed.length(); + int pos = 0; + + char current; + while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { + pos++; + } + + final String number = trimmed.substring(0, pos); + final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US); + + if (number.isEmpty()) { + throw new NumberFormatException("text does not start with a number"); + } + + final long value; + try { + value = Long.parseLong(number); // this throws a NumberFormatException on overflow + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "The value '" + + number + + "' cannot be re represented as 64bit number (numeric overflow)."); + } + + final long multiplier = parseUnit(unit).map(MemoryUnit::getMultiplier).orElse(1L); + final long result = value * multiplier; + + // check for overflow + if (result / multiplier != value) { + throw new IllegalArgumentException( + "The value '" + + text + + "' cannot be re represented as 64bit number of bytes (numeric overflow)."); + } + + return result; + } + + private static Optional parseUnit(String unit) { + if (matchesAny(unit, BYTES)) { + return Optional.of(BYTES); + } else if (matchesAny(unit, KILO_BYTES)) { + return Optional.of(KILO_BYTES); + } else if (matchesAny(unit, MEGA_BYTES)) { + return Optional.of(MEGA_BYTES); + } else if (matchesAny(unit, GIGA_BYTES)) { + return Optional.of(GIGA_BYTES); + } else if (matchesAny(unit, TERA_BYTES)) { + return Optional.of(TERA_BYTES); + } else if (!unit.isEmpty()) { + throw new IllegalArgumentException( + "Memory size unit '" + + unit + + "' does not match any of the recognized units: " + + MemoryUnit.getAllUnits()); + } + + return Optional.empty(); + } + + private static boolean matchesAny(String str, MemoryUnit unit) { + for (String s : unit.getUnits()) { + if (s.equals(str)) { + return true; + } + } + return false; + } + + /** + * Enum which defines memory unit, mostly used to parse value from configuration file. + * + *

To make larger values more compact, the common size suffixes are supported: + * + *

+ */ + public enum MemoryUnit { + BYTES(new String[] {"b", "bytes"}, 1L), + KILO_BYTES(new String[] {"k", "kb", "kibibytes"}, 1024L), + MEGA_BYTES(new String[] {"m", "mb", "mebibytes"}, 1024L * 1024L), + GIGA_BYTES(new String[] {"g", "gb", "gibibytes"}, 1024L * 1024L * 1024L), + TERA_BYTES(new String[] {"t", "tb", "tebibytes"}, 1024L * 1024L * 1024L * 1024L); + + private final String[] units; + + private final long multiplier; + + MemoryUnit(String[] units, long multiplier) { + this.units = units; + this.multiplier = multiplier; + } + + public String[] getUnits() { + return units; + } + + public long getMultiplier() { + return multiplier; + } + + public static String getAllUnits() { + return concatenateUnits( + BYTES.getUnits(), + KILO_BYTES.getUnits(), + MEGA_BYTES.getUnits(), + GIGA_BYTES.getUnits(), + TERA_BYTES.getUnits()); + } + + public static boolean hasUnit(String text) { + checkArgument(text != null, "text can't be null"); + + final String trimmed = text.trim(); + checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string"); + + final int len = trimmed.length(); + int pos = 0; + + char current; + while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { + pos++; + } + + final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US); + + return unit.length() > 0; + } + + private static String concatenateUnits(final String[]... allUnits) { + final StringBuilder builder = new StringBuilder(128); + + for (String[] units : allUnits) { + builder.append('('); + + for (String unit : units) { + builder.append(unit); + builder.append(" | "); + } + + builder.setLength(builder.length() - 3); + builder.append(") / "); + } + + builder.setLength(builder.length() - 3); + return builder.toString(); + } + } +} diff --git a/amoro-common/src/test/java/org/apache/amoro/utils/MemorySizeTest.java b/amoro-common/src/test/java/org/apache/amoro/utils/MemorySizeTest.java new file mode 100644 index 0000000000..7838e831cb --- /dev/null +++ b/amoro-common/src/test/java/org/apache/amoro/utils/MemorySizeTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.utils; + +import static org.apache.amoro.utils.MemorySize.MemoryUnit.MEGA_BYTES; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import org.junit.Test; + +/** Tests for the {@link MemorySize} class. */ +public class MemorySizeTest { + + @Test + public void testUnitConversion() { + final MemorySize zero = MemorySize.ZERO; + assertEquals(0, zero.getBytes()); + assertEquals(0, zero.getKibiBytes()); + assertEquals(0, zero.getMebiBytes()); + assertEquals(0, zero.getGibiBytes()); + assertEquals(0, zero.getTebiBytes()); + + final MemorySize bytes = new MemorySize(955); + assertEquals(955, bytes.getBytes()); + assertEquals(0, bytes.getKibiBytes()); + assertEquals(0, bytes.getMebiBytes()); + assertEquals(0, bytes.getGibiBytes()); + assertEquals(0, bytes.getTebiBytes()); + + final MemorySize kilos = new MemorySize(18500); + assertEquals(18500, kilos.getBytes()); + assertEquals(18, kilos.getKibiBytes()); + assertEquals(0, kilos.getMebiBytes()); + assertEquals(0, kilos.getGibiBytes()); + assertEquals(0, kilos.getTebiBytes()); + + final MemorySize megas = new MemorySize(15 * 1024 * 1024); + assertEquals(15_728_640, megas.getBytes()); + assertEquals(15_360, megas.getKibiBytes()); + assertEquals(15, megas.getMebiBytes()); + assertEquals(0, megas.getGibiBytes()); + assertEquals(0, megas.getTebiBytes()); + + final MemorySize teras = new MemorySize(2L * 1024 * 1024 * 1024 * 1024 + 10); + assertEquals(2199023255562L, teras.getBytes()); + assertEquals(2147483648L, teras.getKibiBytes()); + assertEquals(2097152, teras.getMebiBytes()); + assertEquals(2048, teras.getGibiBytes()); + assertEquals(2, teras.getTebiBytes()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalid() { + new MemorySize(-1); + } + + @Test + public void testParseBytes() { + assertEquals(1234, MemorySize.parseBytes("1234")); + assertEquals(1234, MemorySize.parseBytes("1234b")); + assertEquals(1234, MemorySize.parseBytes("1234 b")); + assertEquals(1234, MemorySize.parseBytes("1234bytes")); + assertEquals(1234, MemorySize.parseBytes("1234 bytes")); + } + + @Test + public void testParseKibiBytes() { + assertEquals(667766, MemorySize.parse("667766k").getKibiBytes()); + assertEquals(667766, MemorySize.parse("667766 k").getKibiBytes()); + assertEquals(667766, MemorySize.parse("667766kb").getKibiBytes()); + assertEquals(667766, MemorySize.parse("667766 kb").getKibiBytes()); + assertEquals(667766, MemorySize.parse("667766kibibytes").getKibiBytes()); + assertEquals(667766, MemorySize.parse("667766 kibibytes").getKibiBytes()); + } + + @Test + public void testParseMebiBytes() { + assertEquals(7657623, MemorySize.parse("7657623m").getMebiBytes()); + assertEquals(7657623, MemorySize.parse("7657623 m").getMebiBytes()); + assertEquals(7657623, MemorySize.parse("7657623mb").getMebiBytes()); + assertEquals(7657623, MemorySize.parse("7657623 mb").getMebiBytes()); + assertEquals(7657623, MemorySize.parse("7657623mebibytes").getMebiBytes()); + assertEquals(7657623, MemorySize.parse("7657623 mebibytes").getMebiBytes()); + } + + @Test + public void testParseGibiBytes() { + assertEquals(987654, MemorySize.parse("987654g").getGibiBytes()); + assertEquals(987654, MemorySize.parse("987654 g").getGibiBytes()); + assertEquals(987654, MemorySize.parse("987654gb").getGibiBytes()); + assertEquals(987654, MemorySize.parse("987654 gb").getGibiBytes()); + assertEquals(987654, MemorySize.parse("987654gibibytes").getGibiBytes()); + assertEquals(987654, MemorySize.parse("987654 gibibytes").getGibiBytes()); + } + + @Test + public void testParseTebiBytes() { + assertEquals(1234567, MemorySize.parse("1234567t").getTebiBytes()); + assertEquals(1234567, MemorySize.parse("1234567 t").getTebiBytes()); + assertEquals(1234567, MemorySize.parse("1234567tb").getTebiBytes()); + assertEquals(1234567, MemorySize.parse("1234567 tb").getTebiBytes()); + assertEquals(1234567, MemorySize.parse("1234567tebibytes").getTebiBytes()); + assertEquals(1234567, MemorySize.parse("1234567 tebibytes").getTebiBytes()); + } + + @Test + public void testUpperCase() { + assertEquals(1L, MemorySize.parse("1 B").getBytes()); + assertEquals(1L, MemorySize.parse("1 K").getKibiBytes()); + assertEquals(1L, MemorySize.parse("1 M").getMebiBytes()); + assertEquals(1L, MemorySize.parse("1 G").getGibiBytes()); + assertEquals(1L, MemorySize.parse("1 T").getTebiBytes()); + } + + @Test + public void testTrimBeforeParse() { + assertEquals(155L, MemorySize.parseBytes(" 155 ")); + assertEquals(155L, MemorySize.parseBytes(" 155 bytes ")); + } + + @Test + public void testParseInvalid() { + // null + try { + MemorySize.parseBytes(null); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + + // empty + try { + MemorySize.parseBytes(""); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + + // blank + try { + MemorySize.parseBytes(" "); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + + // no number + try { + MemorySize.parseBytes("foobar or fubar or foo bazz"); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + + // wrong unit + try { + MemorySize.parseBytes("16 gjah"); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + + // multiple numbers + try { + MemorySize.parseBytes("16 16 17 18 bytes"); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + + // negative number + try { + MemorySize.parseBytes("-100 bytes"); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + } + + @Test(expected = IllegalArgumentException.class) + public void testParseNumberOverflow() { + MemorySize.parseBytes("100000000000000000000000000000000 bytes"); + } + + @Test(expected = IllegalArgumentException.class) + public void testParseNumberTimeUnitOverflow() { + MemorySize.parseBytes("100000000000000 tb"); + } + + @Test + public void testParseWithDefaultUnit() { + assertEquals(7, MemorySize.parse("7", MEGA_BYTES).getMebiBytes()); + assertNotEquals(7, MemorySize.parse("7340032", MEGA_BYTES)); + assertEquals(7, MemorySize.parse("7m", MEGA_BYTES).getMebiBytes()); + assertEquals(7168, MemorySize.parse("7", MEGA_BYTES).getKibiBytes()); + assertEquals(7168, MemorySize.parse("7m", MEGA_BYTES).getKibiBytes()); + assertEquals(7, MemorySize.parse("7 m", MEGA_BYTES).getMebiBytes()); + assertEquals(7, MemorySize.parse("7mb", MEGA_BYTES).getMebiBytes()); + assertEquals(7, MemorySize.parse("7 mb", MEGA_BYTES).getMebiBytes()); + assertEquals(7, MemorySize.parse("7mebibytes", MEGA_BYTES).getMebiBytes()); + assertEquals(7, MemorySize.parse("7 mebibytes", MEGA_BYTES).getMebiBytes()); + } + + @Test + public void testDivideByLong() { + final MemorySize memory = new MemorySize(100L); + assertThat(memory.divide(23), is(new MemorySize(4L))); + } + + @Test(expected = IllegalArgumentException.class) + public void testDivideByNegativeLong() { + final MemorySize memory = new MemorySize(100L); + memory.divide(-23L); + } + + @Test + public void testToHumanReadableString() { + assertThat(new MemorySize(0L).toHumanReadableString(), is("0 bytes")); + assertThat(new MemorySize(1L).toHumanReadableString(), is("1 bytes")); + assertThat(new MemorySize(1024L).toHumanReadableString(), is("1024 bytes")); + assertThat(new MemorySize(1025L).toHumanReadableString(), is("1.001kb (1025 bytes)")); + assertThat(new MemorySize(1536L).toHumanReadableString(), is("1.500kb (1536 bytes)")); + assertThat(new MemorySize(1_000_000L).toHumanReadableString(), is("976.563kb (1000000 bytes)")); + assertThat( + new MemorySize(1_000_000_000L).toHumanReadableString(), is("953.674mb (1000000000 bytes)")); + assertThat( + new MemorySize(1_000_000_000_000L).toHumanReadableString(), + is("931.323gb (1000000000000 bytes)")); + assertThat( + new MemorySize(1_000_000_000_000_000L).toHumanReadableString(), + is("909.495tb (1000000000000000 bytes)")); + } +}