diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index f9148ba8699fd..51e181811c228 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -86,6 +87,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null; public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS = null; public static final Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null; + public static final String EXTRA_CONFIG_PREFIX = "managedLedgerOffloadExtraConfig"; public static final String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE = "managedLedgerOffloadAutoTriggerSizeThresholdBytes"; @@ -121,8 +123,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) - private Map managedLedgerExtraConfigurations = null; - + private Map managedLedgerExtraConfigurations = new HashMap<>(); // s3 config, set by service configuration or cli @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) @@ -248,8 +249,7 @@ public static OffloadPoliciesImpl create(String driver, String region, String bu public static OffloadPoliciesImpl create(Properties properties) { OffloadPoliciesImpl data = new OffloadPoliciesImpl(); - Field[] fields = OffloadPoliciesImpl.class.getDeclaredFields(); - Arrays.stream(fields).forEach(f -> { + for (Field f : CONFIGURATION_FIELDS) { if (properties.containsKey(f.getName())) { try { f.setAccessible(true); @@ -260,14 +260,15 @@ public static OffloadPoliciesImpl create(Properties properties) { f.getName(), properties.get(f.getName())), e); } } - }); + } + Map extraConfigurations = properties.entrySet().stream() - .filter(entry -> entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig")) - .collect(Collectors.toMap( - entry -> entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""), - entry -> entry.getValue().toString())); + .filter(entry -> entry.getKey().toString().startsWith(EXTRA_CONFIG_PREFIX)) + .collect(Collectors.toMap( + entry -> entry.getKey().toString().replaceFirst(EXTRA_CONFIG_PREFIX, ""), + entry -> entry.getValue().toString())); - data.setManagedLedgerExtraConfigurations(extraConfigurations); + data.getManagedLedgerExtraConfigurations().putAll(extraConfigurations); data.compatibleWithBrokerConfigFile(properties); return data; @@ -346,66 +347,21 @@ public boolean bucketValid() { public Properties toProperties() { Properties properties = new Properties(); - setProperty(properties, "managedLedgerOffloadedReadPriority", this.getManagedLedgerOffloadedReadPriority()); - setProperty(properties, "offloadersDirectory", this.getOffloadersDirectory()); - setProperty(properties, "managedLedgerOffloadDriver", this.getManagedLedgerOffloadDriver()); - setProperty(properties, "managedLedgerOffloadMaxThreads", - this.getManagedLedgerOffloadMaxThreads()); - setProperty(properties, "managedLedgerOffloadPrefetchRounds", - this.getManagedLedgerOffloadPrefetchRounds()); - setProperty(properties, "managedLedgerOffloadThresholdInBytes", - this.getManagedLedgerOffloadThresholdInBytes()); - setProperty(properties, "managedLedgerOffloadThresholdInSeconds", - this.getManagedLedgerOffloadThresholdInSeconds()); - setProperty(properties, "managedLedgerOffloadDeletionLagInMillis", - this.getManagedLedgerOffloadDeletionLagInMillis()); - setProperty(properties, "managedLedgerOffloadExtraConfigurations", - this.getManagedLedgerExtraConfigurations()); - - if (this.isS3Driver()) { - setProperty(properties, "s3ManagedLedgerOffloadRegion", - this.getS3ManagedLedgerOffloadRegion()); - setProperty(properties, "s3ManagedLedgerOffloadBucket", - this.getS3ManagedLedgerOffloadBucket()); - setProperty(properties, "s3ManagedLedgerOffloadServiceEndpoint", - this.getS3ManagedLedgerOffloadServiceEndpoint()); - setProperty(properties, "s3ManagedLedgerOffloadMaxBlockSizeInBytes", - this.getS3ManagedLedgerOffloadMaxBlockSizeInBytes()); - setProperty(properties, "s3ManagedLedgerOffloadCredentialId", - this.getS3ManagedLedgerOffloadCredentialId()); - setProperty(properties, "s3ManagedLedgerOffloadCredentialSecret", - this.getS3ManagedLedgerOffloadCredentialSecret()); - setProperty(properties, "s3ManagedLedgerOffloadRole", - this.getS3ManagedLedgerOffloadRole()); - setProperty(properties, "s3ManagedLedgerOffloadRoleSessionName", - this.getS3ManagedLedgerOffloadRoleSessionName()); - setProperty(properties, "s3ManagedLedgerOffloadReadBufferSizeInBytes", - this.getS3ManagedLedgerOffloadReadBufferSizeInBytes()); - } else if (this.isGcsDriver()) { - setProperty(properties, "gcsManagedLedgerOffloadRegion", - this.getGcsManagedLedgerOffloadRegion()); - setProperty(properties, "gcsManagedLedgerOffloadBucket", - this.getGcsManagedLedgerOffloadBucket()); - setProperty(properties, "gcsManagedLedgerOffloadMaxBlockSizeInBytes", - this.getGcsManagedLedgerOffloadMaxBlockSizeInBytes()); - setProperty(properties, "gcsManagedLedgerOffloadReadBufferSizeInBytes", - this.getGcsManagedLedgerOffloadReadBufferSizeInBytes()); - setProperty(properties, "gcsManagedLedgerOffloadServiceAccountKeyFile", - this.getGcsManagedLedgerOffloadServiceAccountKeyFile()); - } else if (this.isFileSystemDriver()) { - setProperty(properties, "fileSystemProfilePath", this.getFileSystemProfilePath()); - setProperty(properties, "fileSystemURI", this.getFileSystemURI()); - } - - setProperty(properties, "managedLedgerOffloadBucket", this.getManagedLedgerOffloadBucket()); - setProperty(properties, "managedLedgerOffloadRegion", this.getManagedLedgerOffloadRegion()); - setProperty(properties, "managedLedgerOffloadServiceEndpoint", - this.getManagedLedgerOffloadServiceEndpoint()); - setProperty(properties, "managedLedgerOffloadMaxBlockSizeInBytes", - this.getManagedLedgerOffloadMaxBlockSizeInBytes()); - setProperty(properties, "managedLedgerOffloadReadBufferSizeInBytes", - this.getManagedLedgerOffloadReadBufferSizeInBytes()); - + for (Field f : CONFIGURATION_FIELDS) { + try { + f.setAccessible(true); + if ("managedLedgerExtraConfigurations".equals(f.getName())) { + Map extraConfig = (Map) f.get(this); + extraConfig.forEach((key, value) -> { + setProperty(properties, EXTRA_CONFIG_PREFIX + key, value); + }); + } else { + setProperty(properties, f.getName(), f.get(this)); + } + } catch (Exception e) { + throw new IllegalArgumentException("An error occurred while processing the field: " + f.getName(), e); + } + } return properties; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java index d79d2c32ffa7f..bbede4e982044 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.common.policies.data; +import static org.apache.pulsar.common.policies.data.OffloadPoliciesImpl.EXTRA_CONFIG_PREFIX; +import static org.testng.Assert.assertEquals; import java.io.DataInputStream; import java.io.File; import java.io.IOException; @@ -26,6 +28,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.testng.Assert; @@ -436,8 +439,8 @@ private byte[] loadClassData(String name) throws IOException { @Test public void testCreateOffloadPoliciesWithExtraConfiguration() { Properties properties = new Properties(); - properties.put("managedLedgerOffloadExtraConfigKey1", "value1"); - properties.put("managedLedgerOffloadExtraConfigKey2", "value2"); + properties.put(EXTRA_CONFIG_PREFIX + "Key1", "value1"); + properties.put(EXTRA_CONFIG_PREFIX + "Key2", "value2"); OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties); Map extraConfigurations = policies.getManagedLedgerExtraConfigurations(); @@ -445,4 +448,28 @@ public void testCreateOffloadPoliciesWithExtraConfiguration() { Assert.assertEquals(extraConfigurations.get("Key1"), "value1"); Assert.assertEquals(extraConfigurations.get("Key2"), "value2"); } + + /** + * Test toProperties as well as create from properties. + * @throws Exception + */ + @Test + public void testToProperties() throws Exception { + // Base information convert. + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket", + "http://test.endpoint", null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024, + 10 * 1024 * 1024L, 100L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST); + assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties())); + + // Set useless config to offload policies. Make sure convert conversion result is the same. + offloadPolicies.setFileSystemProfilePath("/test/file"); + assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties())); + + // Set extra config to offload policies. Make sure convert conversion result is the same. + Map extraConfiguration = new HashMap<>(); + extraConfiguration.put("key1", "value1"); + extraConfiguration.put("key2", "value2"); + offloadPolicies.setManagedLedgerExtraConfigurations(extraConfiguration); + assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties())); + } }