Skip to content

Commit

Permalink
[AMORO-3329] Add MemorySize for parsing memory value and unit
Browse files Browse the repository at this point in the history
  • Loading branch information
jzjsnow committed Jan 17, 2025
1 parent 957e521 commit 9334783
Show file tree
Hide file tree
Showing 6 changed files with 743 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.amoro.shade.guava32.com.google.common.base.Suppliers;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.amoro.utils.MemorySize;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
Expand Down Expand Up @@ -228,9 +229,11 @@ protected String buildOptimizerStartupArgsString(Resource resource) {
properties, resourceFlinkConf, FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY);

resourceFlinkConf.putToOptions(
FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, jobManagerMemory + "m");
FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(jobManagerMemory).toString());
resourceFlinkConf.putToOptions(
FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY, taskManagerMemory + "m");
FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(taskManagerMemory).toString());
resourceFlinkConf.putToOptions(
FlinkConfKeys.YARN_APPLICATION_JOB_NAME,
String.join(
Expand Down Expand Up @@ -354,28 +357,8 @@ protected long parseMemorySize(String memoryStr) {
if (memoryStr == null || memoryStr.isEmpty()) {
return 0;
}
memoryStr = memoryStr.toLowerCase().trim().replaceAll("\\s", "");
Matcher matcher = Pattern.compile("(\\d+)([mg])").matcher(memoryStr);
if (matcher.matches()) {
long size = Long.parseLong(matcher.group(1));
String unit = matcher.group(2);
switch (unit) {
case "m":
return size;
case "g":
return size * 1024;
default:
LOG.error("Invalid memory size unit: {}, Please use m or g as the unit", unit);
return 0;
}
} else {
try {
return Long.parseLong(memoryStr);
} catch (NumberFormatException e) {
LOG.error("Invalid memory size format: {}", memoryStr);
return 0;
}
}

return MemorySize.parse(memoryStr).getMebiBytes();
}

private <T> T fetchCommandOutput(Process exec, Function<String, T> commandReader) {
Expand Down
8 changes: 8 additions & 0 deletions amoro-ams/src/main/resources/mysql/upgrade.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,11 @@ CREATE TABLE `http_session` (
PRIMARY KEY(`session_id`, `context_path`, `virtual_host`),
KEY `idx_session_expiry` (`expiry_time`)
) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Http session store' ROW_FORMAT=DYNAMIC;

-- update resource group memory unit
update resource_group set properties = JSON_SET(properties, '$."flink-conf.jobmanager.memory.process.size"', CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.jobmanager.memory.process.size"')), 'MB')) WHERE JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.jobmanager.memory.process.size"')) REGEXP '^[0-9]+$';
update resource_group set properties = JSON_SET(properties, '$."flink-conf.taskmanager.memory.process.size"', CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.taskmanager.memory.process.size"')), 'MB')) WHERE JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.taskmanager.memory.process.size"')) REGEXP '^[0-9]+$';

-- update resource memory unit
update resource set properties = JSON_SET(properties, '$."flink-conf.jobmanager.memory.process.size"', CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.jobmanager.memory.process.size"')), 'MB')) WHERE JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.jobmanager.memory.process.size"')) REGEXP '^[0-9]+$';
update resource set properties = JSON_SET(properties, '$."flink-conf.taskmanager.memory.process.size"', CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.taskmanager.memory.process.size"')), 'MB')) WHERE JSON_UNQUOTE(JSON_EXTRACT(properties, '$."flink-conf.taskmanager.memory.process.size"')) REGEXP '^[0-9]+$';
36 changes: 35 additions & 1 deletion amoro-ams/src/main/resources/postgres/upgrade.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,38 @@ COMMENT ON COLUMN http_session.last_save_time IS 'Last save time';
COMMENT ON COLUMN http_session.expiry_time IS 'Expiry time';
COMMENT ON COLUMN http_session.max_interval IS 'Max internal';
COMMENT ON COLUMN http_session.data_store IS 'Session data store';
COMMENT ON TABLE http_session IS 'Http session store';
COMMENT ON TABLE http_session IS 'Http session store';

-- update resource group memory unit
UPDATE resource_group
SET properties = jsonb_set(
properties,
'{flink-conf,jobmanager,memory,process,size}',
(properties->>'flink-conf.jobmanager.memory.process.size') || 'MB'
)
WHERE (properties->>'flink-conf.jobmanager.memory.process.size') ~ '^[0-9]+$';

UPDATE resource_group
SET properties = jsonb_set(
properties,
'{flink-conf,taskmanager,memory,process,size}',
(properties->>'flink-conf.taskmanager.memory.process.size') || 'MB'
)
WHERE (properties->>'flink-conf.taskmanager.memory.process.size') ~ '^[0-9]+$';

-- update resource memory unit
UPDATE resource
SET properties = jsonb_set(
properties,
'{flink-conf,jobmanager,memory,process,size}',
(properties->>'flink-conf.jobmanager.memory.process.size') || 'MB'
)
WHERE (properties->>'flink-conf.jobmanager.memory.process.size') ~ '^[0-9]+$';

UPDATE resource
SET properties = jsonb_set(
properties,
'{flink-conf,taskmanager,memory,process,size}',
(properties->>'flink-conf.taskmanager.memory.process.size') || 'MB'
)
WHERE (properties->>'flink-conf.taskmanager.memory.process.size') ~ '^[0-9]+$';
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,36 @@ public TestFlinkOptimizerContainer() {

@Test
public void testParseMemorySize() {
Assert.assertEquals(100, container.parseMemorySize("100"));
Assert.assertEquals(0, container.parseMemorySize("100"));
Assert.assertEquals(0, container.parseMemorySize("100k"));
Assert.assertEquals(0, container.parseMemorySize("100kb"));
Assert.assertEquals(0, container.parseMemorySize("100 k"));
Assert.assertEquals(0, container.parseMemorySize("100 kb"));
Assert.assertEquals(0, container.parseMemorySize("100K"));
Assert.assertEquals(0, container.parseMemorySize("100KB"));
Assert.assertEquals(0, container.parseMemorySize("100 K"));
Assert.assertEquals(0, container.parseMemorySize("100 KB"));
Assert.assertEquals(100, container.parseMemorySize("100m"));
Assert.assertEquals(100, container.parseMemorySize("100mb"));
Assert.assertEquals(100, container.parseMemorySize("100 m"));
Assert.assertEquals(100, container.parseMemorySize("100 mb"));
Assert.assertEquals(100, container.parseMemorySize("100M"));
Assert.assertEquals(100, container.parseMemorySize("100MB"));
Assert.assertEquals(100, container.parseMemorySize("100 M"));
Assert.assertEquals(100, container.parseMemorySize("100 MB"));
Assert.assertEquals(102400, container.parseMemorySize("100g"));
Assert.assertEquals(102400, container.parseMemorySize("100gb"));
Assert.assertEquals(102400, container.parseMemorySize("100 g"));
Assert.assertEquals(102400, container.parseMemorySize("100 gb"));
Assert.assertEquals(102400, container.parseMemorySize("100G"));
Assert.assertEquals(102400, container.parseMemorySize("100GB"));
Assert.assertEquals(102400, container.parseMemorySize("100 G"));
Assert.assertEquals(0, container.parseMemorySize("G100G"));
Assert.assertEquals(0, container.parseMemorySize("100kb"));
Assert.assertEquals(102400, container.parseMemorySize("100 GB"));
try {
Assert.assertEquals(0, container.parseMemorySize("G100G"));
} catch (NumberFormatException e) {
Assert.assertEquals("text does not start with a number", e.getMessage());
}
}

@Test
Expand Down Expand Up @@ -110,11 +129,11 @@ public void testGetMemorySizeValue() {
FlinkOptimizerContainer.FlinkConf.buildFor(prop, Maps.newHashMap()).build();

Assert.assertEquals(
100L,
0,
container.getMemorySizeValue(
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
100L,
0,
container.getMemorySizeValue(
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));

Expand All @@ -124,7 +143,7 @@ public void testGetMemorySizeValue() {
conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop, containerProperties).build();
prop.clear();
Assert.assertEquals(
200L,
0,
container.getMemorySizeValue(
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
Expand All @@ -143,7 +162,7 @@ public void testGetMemorySizeValue() {
container.getMemorySizeValue(
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
300L,
0,
container.getMemorySizeValue(
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));

Expand All @@ -157,5 +176,17 @@ public void testGetMemorySizeValue() {
0L,
container.getMemorySizeValue(
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));

prop.put(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, "400 MB");
prop.put(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY, "400 MB");
conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop, Maps.newHashMap()).build();
Assert.assertEquals(
400L,
container.getMemorySizeValue(
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
400L,
container.getMemorySizeValue(
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
}
}
Loading

0 comments on commit 9334783

Please sign in to comment.