Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[feat] [broker] PIP-188 Fix cluster migration state store into local …
Browse files Browse the repository at this point in the history
…metadatastore (apache#21359)

Co-authored-by: Rajan Dhabalia <[email protected]>
  • Loading branch information
rdhabalia and Rajan Dhabalia authored Oct 25, 2023
1 parent 3ebb855 commit 04d1225
Show file tree
Hide file tree
Showing 18 changed files with 380 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import lombok.Getter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
Expand All @@ -39,10 +40,19 @@ public class ClusterResources extends BaseResources<ClusterData> {

@Getter
private FailureDomainResources failureDomainResources;

public ClusterResources(MetadataStore store, int operationTimeoutSec) {
super(store, ClusterData.class, operationTimeoutSec);
this.failureDomainResources = new FailureDomainResources(store, FailureDomainImpl.class, operationTimeoutSec);
@Getter
private ClusterPoliciesResources clusterPoliciesResources;

public ClusterResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, ClusterData.class, operationTimeoutSec);
this.failureDomainResources = new FailureDomainResources(configurationStore, FailureDomainImpl.class,
operationTimeoutSec);
if (localStore != null) {
this.clusterPoliciesResources = new ClusterPoliciesResources(localStore, ClusterPoliciesImpl.class,
operationTimeoutSec);
} else {
this.clusterPoliciesResources = null;
}
}

public CompletableFuture<Set<String>> listAsync() {
Expand Down Expand Up @@ -216,4 +226,26 @@ public void registerListener(Consumer<Notification> listener) {
});
}
}

public static class ClusterPoliciesResources extends BaseResources<ClusterPoliciesImpl> {
public static final String LOCAL_POLICIES_PATH = "policies";

public ClusterPoliciesResources(MetadataStore store, Class<ClusterPoliciesImpl> clazz,
int operationTimeoutSec) {
super(store, clazz, operationTimeoutSec);
}

public Optional<ClusterPoliciesImpl> getClusterPolicies(String clusterName) throws MetadataStoreException {
return get(joinPath(BASE_CLUSTERS_PATH, clusterName, LOCAL_POLICIES_PATH));
}

public CompletableFuture<Optional<ClusterPoliciesImpl>> getClusterPoliciesAsync(String clusterName) {
return getAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, LOCAL_POLICIES_PATH));
}

public CompletableFuture<Void> setPoliciesWithCreateAsync(String clusterName,
Function<Optional<ClusterPoliciesImpl>, ClusterPoliciesImpl> createFunction) {
return setWithCreateAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, LOCAL_POLICIES_PATH), createFunction);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
int operationTimeoutSec) {
if (configurationMetadataStore != null) {
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore,
operationTimeoutSec);
namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.ClusterPolicies;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
Expand Down Expand Up @@ -247,6 +249,41 @@ public void updateCluster(
});
}

@GET
@Path("/{cluster}/migrate")
@ApiOperation(
value = "Get the cluster migration configuration for the specified cluster.",
response = ClusterDataImpl.class,
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Return the cluster data.", response = ClusterDataImpl.class),
@ApiResponse(code = 403, message = "Don't have admin permission."),
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public ClusterPolicies getClusterMigration(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) {
validateSuperUserAccess();

try {
return clusterResources().getClusterPoliciesResources().getClusterPolicies(cluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e);
if (e instanceof RestException) {
throw (RestException) e;
} else {
throw new RestException(e);
}
}
}

@POST
@Path("/{cluster}/migrate")
@ApiOperation(
Expand Down Expand Up @@ -286,8 +323,9 @@ public void updateClusterMigration(
}
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> clusterResources().updateClusterAsync(cluster, old -> {
ClusterDataImpl data = (ClusterDataImpl) old;
.thenCompose(__ -> clusterResources().getClusterPoliciesResources().setPoliciesWithCreateAsync(cluster,
old -> {
ClusterPoliciesImpl data = old.orElse(new ClusterPoliciesImpl());
data.setMigrated(isMigrated);
data.setMigratedClusterUrl(clusterUrl);
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
Expand Down Expand Up @@ -1358,14 +1358,29 @@ public static CompletableFuture<Boolean> isClusterMigrationEnabled(PulsarService
}

public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync(PulsarService pulsar,
String topic) {
return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName())
String topic) {
CompletableFuture<Optional<ClusterUrl>> result = new CompletableFuture<>();
pulsar.getPulsarResources().getClusterResources().getClusterPoliciesResources()
.getClusterPoliciesAsync(pulsar.getConfig().getClusterName())
.thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
((clusterData, isNamespaceMigrationEnabled)
-> ((clusterData.isPresent() && clusterData.get().isMigrated())
|| isNamespaceMigrationEnabled)
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty()));
((clusterData, isNamespaceMigrationEnabled) -> {
Optional<ClusterUrl> url = ((clusterData.isPresent() && clusterData.get().isMigrated())
|| isNamespaceMigrationEnabled)
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty();
return url;
}))
.thenAccept(res -> {
// cluster policies future is completed by metadata-store thread and continuing further
// processing in the same metadata store can cause deadlock while creating topic as
// create topic path may have blocking call on metadata-store. so, complete future on a
// separate thread to avoid deadlock.
pulsar.getExecutor().execute(() -> result.complete(res));
}).exceptionally(ex -> {
pulsar.getExecutor().execute(() -> result.completeExceptionally(ex.getCause()));
return null;
});
return result;
}

private static CompletableFuture<Boolean> isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
Expand Down
Loading

0 comments on commit 04d1225

Please sign in to comment.