This repository has been archived by the owner on Apr 1, 2024. It is now read-only.
forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improve][pip] PIP-312: Use StateStoreProvider to manage state in Pul…
…sar Functions endpoints (apache#21438)
- Loading branch information
1 parent
403faa4
commit cbdb19c
Showing
1 changed file
with
150 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
# PIP-312: Use StateStoreProvider to manage state in Pulsar Functions endpoints | ||
|
||
# Background knowledge | ||
|
||
States are key-value pairs, where a key is a string and its value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. | ||
Keys are scoped to an individual function and shared between instances of that function. | ||
|
||
Pulsar Functions use `StateStoreProvider` to initialize a `StateStore` to manage state, so it can support multiple state storage backend, such as: | ||
- `BKStateStoreProviderImpl`: use Apache BookKeeper as the backend | ||
- `PulsarMetadataStateStoreProviderImpl`: use Pulsar Metadata as the backend | ||
|
||
Users can also implement their own `StateStoreProvider` to support other state storage backend. | ||
|
||
The Broker also exposes two endpoints to put and query a state key of a function: | ||
- GET /{tenant}/{namespace}/{functionName}/state/{key} | ||
- POST /{tenant}/{namespace}/{functionName}/state/{key} | ||
|
||
Although Pulsar Function supports multiple state storage backend, these two endpoints are still using BookKeeper's `StorageAdminClient` directly to put and query state, | ||
this makes the Pulsar Functions' state store highly coupled with Apache BookKeeper. | ||
|
||
See: [code](https://github.com/apache/pulsar/blob/1a66b640c3cd86bfca75dc9ab37bfdb37427a13f/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java#L1152-L1297) | ||
|
||
# Motivation | ||
|
||
This proposal aims to decouple Pulsar Functions' state store from Apache BookKeeper, so it can support other state storage backend. | ||
|
||
# Goals | ||
|
||
## In Scope | ||
|
||
- Pulsar Functions can use other state storage backend other than Apache BookKeeper. | ||
|
||
## Out of Scope | ||
|
||
None | ||
|
||
# High Level Design | ||
|
||
- Replace the `StorageAdminClient` in `ComponentImpl` with `StateStoreProvider` to manage state. | ||
- Add a `cleanup` method to the `StateStoreProvider` interface | ||
|
||
# Detailed Design | ||
|
||
## Design & Implementation Details | ||
|
||
1. In the `ComponentImpl#getFunctionState` and `ComponentImpl#queryState` methods, replace the `StorageAdminClient` with `StateStoreProvider`: | ||
|
||
```java | ||
String tableNs = getStateNamespace(tenant, namespace); | ||
String tableName = functionName; | ||
|
||
String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl(); | ||
|
||
if (storageClient.get() == null) { | ||
storageClient.compareAndSet(null, StorageClientBuilder.newBuilder() | ||
.withSettings(StorageClientSettings.newBuilder() | ||
.serviceUri(stateStorageServiceUrl) | ||
.clientName("functions-admin") | ||
.build()) | ||
.withNamespace(tableNs) | ||
.build()); | ||
} | ||
... | ||
``` | ||
|
||
Replaced to: | ||
|
||
```java | ||
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, name); | ||
``` | ||
|
||
2. Add a `cleanup` method to the `StateStoreProvider` interface: | ||
|
||
```java | ||
default void cleanUp(String tenant, String namespace, String name) throws Exception; | ||
``` | ||
|
||
Because when delete a function, the related state store should also be deleted. | ||
Currently, it's also using BookKeeper's `StorageAdminClient` to delete the state store table: | ||
|
||
```java | ||
deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName); | ||
|
||
|
||
private void deleteStatestoreTableAsync(String namespace, String table) { | ||
StorageAdminClient adminClient = worker().getStateStoreAdminClient(); | ||
if (adminClient != null) { | ||
adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> { | ||
if ((throwable == null && res) | ||
|| ((throwable instanceof NamespaceNotFoundException | ||
|| throwable instanceof StreamNotFoundException))) { | ||
log.info("{}/{} table deleted successfully", namespace, table); | ||
} else { | ||
if (throwable != null) { | ||
log.error("{}/{} table deletion failed {} but moving on", namespace, table, throwable); | ||
} else { | ||
log.error("{}/{} table deletion failed but moving on", namespace, table); | ||
} | ||
} | ||
}); | ||
} | ||
} | ||
``` | ||
|
||
So this proposal will add a `cleanup` method to the `StateStoreProvider` and call it after a function is deleted: | ||
|
||
```java | ||
worker().getStateStoreProvider().cleanUp(tenant, namespace, hashName); | ||
``` | ||
|
||
3. Add a new `init` method to `StateStoreProvider` interface: | ||
|
||
The current `init` method requires a `FunctionDetails` parameter, but we cannot get the `FunctionDetails` in the `ComponentImpl` class, | ||
and this parameter is not used either in `BKStateStoreProviderImpl` or in `PulsarMetadataStateStoreProviderImpl`, | ||
but for backward compatibility, instead of updating the `init` method, this proposal will add a new `init` method without `FunctionDetails` parameter: | ||
|
||
```java | ||
default void init(Map<String, Object> config) throws Exception {} | ||
``` | ||
|
||
## Public-facing Changes | ||
|
||
None | ||
|
||
# Monitoring | ||
|
||
# Security Considerations | ||
|
||
# Backward & Forward Compatibility | ||
|
||
## Revert | ||
|
||
- Nothing needs to be done if users use the Apache BookKeeper as the state storage backend. | ||
- If users use another state storage backend, they need to change it back to BookKeeper. | ||
|
||
## Upgrade | ||
|
||
Nothing needs to be done. | ||
|
||
# Alternatives | ||
|
||
# General Notes | ||
|
||
# Links | ||
|
||
<!-- | ||
Updated afterwards | ||
--> | ||
* Mailing List discussion thread: https://lists.apache.org/thread/0rz29wotonmdck76pdscwbqo19t3rbds | ||
* Mailing List voting thread: https://lists.apache.org/thread/t8vmyxovrrb5xl8jvrp1om50l6nprdjt |