Skip to content

Commit

Permalink
Merge branch 'main' into redis_valkey_docs_misc
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 27, 2024
2 parents 23a43af + 71d1c7f commit 2137431
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 48 deletions.
80 changes: 40 additions & 40 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ Future transforms won't be added to the public API while in alpha. But in these
| [Protect](#protect) || Alpha |
| [QueryCounter](#querycounter) || Alpha |
| [QueryTypeFilter](#querytypefilter) || Alpha |
| [RedisCache](#rediscache) || Alpha |
| [RedisClusterPortsRewrite](#redisclusterportsrewrite) || Beta |
| [RedisSinkCluster](#redissinkcluster) || Beta |
| [RedisSinkSingle](#redissinksingle) || Beta |
| [ValkeyCache](#valkeyCache) || Alpha |
| [ValkeyClusterPortsRewrite](#valkeyclusterportsrewrite) || Beta |
| [ValkeySinkCluster](#valkeysinkcluster) || Beta |
| [ValkeySinkSingle](#valkeysinksingle) || Beta |
| [Tee](#tee) || Alpha |
| [RequestThrottling](#requestthrottling) | | Alpha |
| [RequestThrottling](#requestthrottling) | | Alpha |

### CassandraSinkCluster

Expand Down Expand Up @@ -217,8 +217,8 @@ This transform will drop any messages it receives and return the supplied respon

```yaml
- DebugReturner
# return a Redis response
Redis: "42"
# return a Valkey response
Valkey: "42"
# To intentionally fail, use this variant
# Fail
Expand Down Expand Up @@ -400,7 +400,7 @@ If we have a parallelism of 3 then we would have 3 instances of the chain: C1, C
chain:
- QueryCounter:
name: "DR chain"
- RedisSinkSingle:
- ValkeySinkSingle:
remote_address: "127.0.0.1:6379"
connect_timeout_ms: 3000
```
Expand Down Expand Up @@ -480,45 +480,45 @@ This transform will drop messages that match the specified filter. You can eithe
# DenyList: [Write, ReadWrite, SchemaChange, PubSubMessage]
```

### RedisCache
### ValkeyCache

This transform will attempt to cache values for a given primary key in a Redis hash set. It is a primarily implemented as a read behind cache. It currently expects an SQL based AST to figure out what to cache (e.g. CQL, PGSQL) and updates to the cache and the backing datastore are performed sequentially.
This transform will attempt to cache values for a given primary key in a Valkey hash set. It is a primarily implemented as a read behind cache. It currently expects an SQL based AST to figure out what to cache (e.g. CQL, PGSQL) and updates to the cache and the backing datastore are performed sequentially.

```yaml
- RedisCache:
- ValkeyCache:
caching_schema:
test:
partition_key: [test]
range_key: [test]
chain:
# The chain can contain anything but must end in a Redis sink
- RedisSinkSingle:
# The IP address and port of the upstream redis node/service.
# The chain can contain anything but must end in a Valkey sink
- ValkeySinkSingle:
# The IP address and port of the upstream valkey node/service.
remote_address: "127.0.0.1:6379"
connect_timeout_ms: 3000
```

### RedisClusterPortsRewrite
### ValkeyClusterPortsRewrite

This transform should be used with the `RedisSinkCluster` transform. It will write over the ports of the nodes returned by `CLUSTER SLOTS` or `CLUSTER NODES` with a user supplied value (typically the port that Shotover is listening on so cluster aware Redis drivers will direct traffic through Shotover instead of the nodes themselves).
This transform should be used with the `ValkeySinkCluster` transform. It will write over the ports of the nodes returned by `CLUSTER SLOTS` or `CLUSTER NODES` with a user supplied value (typically the port that Shotover is listening on so cluster aware Valkey drivers will direct traffic through Shotover instead of the nodes themselves).

```yaml
- RedisClusterPortsRewrite:
- ValkeyClusterPortsRewrite:
# rewrite the ports returned by `CLUSTER SLOTS` and `CLUSTER NODES` to use this port.
new_port: 6380
```
### RedisSinkCluster
### ValkeySinkCluster
This transform is a full featured Redis driver that will connect to a Redis cluster and handle all discovery, sharding and routing operations.
This transform is a full featured Valkey driver that will connect to a Valkey cluster and handle all discovery, sharding and routing operations.
```yaml
- RedisSinkCluster:
# A list of IP address and ports of the upstream redis nodes/services.
- ValkeySinkCluster:
# A list of IP address and ports of the upstream valkey nodes/services.
first_contact_points: ["127.0.0.1:2220", "127.0.0.1:2221", "127.0.0.1:2222", "127.0.0.1:2223", "127.0.0.1:2224", "127.0.0.1:2225"]

# By default RedisSinkCluster will attempt to emulate a single non-clustered redis node by completely hiding the fact that redis is a cluster.
# By default ValkeySinkCluster will attempt to emulate a single non-clustered valkey node by completely hiding the fact that valkey is a cluster.
# However, when this field is provided, this cluster hiding is disabled.
# Instead other nodes in the cluster will only be accessed when performing a command that accesses a slot.
# All other commands will be passed directly to the direct_connection node.
Expand All @@ -529,7 +529,7 @@ This transform is a full featured Redis driver that will connect to a Redis clus
# When this field is not provided connection_count defaults to 1.
connection_count: 1

# Number of milliseconds to wait for a connection to be created to a destination redis instance.
# Number of milliseconds to wait for a connection to be created to a destination valkey instance.
# If the timeout is exceeded then connection to another node is attempted
# If all known nodes have resulted in connection timeouts an error will be returned to the client.
connect_timeout_ms: 3000
Expand All @@ -540,40 +540,40 @@ This transform is a full featured Redis driver that will connect to a Redis clus
# # Path to the certificate authority file, typically named ca.crt.
# certificate_authority_path: "tls/ca.crt"
# # Path to the certificate file, typically named with a .crt extension.
# certificate_path: "tls/redis.crt"
# certificate_path: "tls/valkey.crt"
# # Path to the private key file, typically named with a .key extension.
# private_key_path: "tls/redis.key"
# private_key_path: "tls/valkey.key"
# # Enable/disable verifying the hostname of the certificate provided by the destination.
# #verify_hostname: true
```

Unlike other Redis cluster drivers, this transform does support pipelining. It does however turn each command from the pipeline into a group of requests split between the master Redis node that owns them, buffering results as within different Redis nodes as needed. This is done sequentially and there is room to make this transform split requests between master nodes in a more concurrent manner.
Unlike other Valkey cluster drivers, this transform does support pipelining. It does however turn each command from the pipeline into a group of requests split between the master Valkey node that owns them, buffering results as within different Valkey nodes as needed. This is done sequentially and there is room to make this transform split requests between master nodes in a more concurrent manner.

Latency and throughput will be different from pipelining with a single Redis node, but not by much.
Latency and throughput will be different from pipelining with a single Valkey node, but not by much.

This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `RedisSinkCluster` and `chain` as the name of the chain that this transform is in.
This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `ValkeySinkCluster` and `chain` as the name of the chain that this transform is in.

#### Differences to real Redis
#### Differences to real Valkey

On an existing authenticated connection, a failed auth attempt will not "unauthenticate" the user. This behaviour matches Redis 6 but is different to Redis 5.
On an existing authenticated connection, a failed auth attempt will not "unauthenticate" the user. This behaviour matches Valkey 6 but is different to Valkey 5.

#### Completeness

_Note: Currently RedisSinkcluster does not support the following functionality:_
_Note: Currently ValkeySinkcluster does not support the following functionality:_

* _Redis Transactions_
* _Valkey Transactions_
* _Scan based operations e.g. SSCAN_

### RedisSinkSingle
### ValkeySinkSingle

This transform will take a query, serialise it into a RESP2 compatible format and send to the Redis compatible database at the defined address.
This transform will take a query, serialise it into a RESP2 compatible format and send to the Valkey compatible database at the defined address.

```yaml
- RedisSinkSingle:
# The IP address and port of the upstream redis node/service.
- ValkeySinkSingle:
# The IP address and port of the upstream valkey node/service.
remote_address: "127.0.0.1:6379"

# Number of milliseconds to wait for a connection to be created to the destination redis instance.
# Number of milliseconds to wait for a connection to be created to the destination valkey instance.
# If the timeout is exceeded then an error is returned to the client.
connect_timeout_ms: 3000

Expand All @@ -583,16 +583,16 @@ This transform will take a query, serialise it into a RESP2 compatible format an
# # Path to the certificate authority file, typically named ca.crt.
# certificate_authority_path: "tls/ca.crt"
# # Path to the certificate file, typically named with a .crt extension.
# certificate_path: "tls/redis.crt"
# certificate_path: "tls/valkey.crt"
# # Path to the private key file, typically named with a .key extension.
# private_key_path: "tls/redis.key"
# private_key_path: "tls/valkey.key"
# # Enable/disable verifying the hostname of the certificate provided by the destination.
# #verify_hostname: true
```

Note: this will just pass the query to the remote node. No cluster discovery or routing occurs with this transform.

This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `RedisSinkSingle` and `chain` as the name of the chain that this transform is in.
This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `ValkeySinkSingle` and `chain` as the name of the chain that this transform is in.

### Tee

Expand Down
28 changes: 20 additions & 8 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,27 @@ impl CassandraSinkCluster {
// Close all other connections as they are now invalidated.
// They will be recreated as needed with the correct use statement used automatically after the handshake
for node in self.pool.nodes_mut() {
node.outbound = None;
if let Some(outbound) = &mut node.outbound {
// Flush all pending responses first, otherwise they would be lost.
// This is bad for performance but USE statements are bad practice anyway.
outbound
.recv_all_pending(responses, self.version.unwrap())
.await
.map_err(|_| anyhow!("Failed to recv_all_pending"))?;

// close the connection
node.outbound = None;

// TODO:
// Sending the use statement to these connections to keep them alive instead is possible but tricky.
// 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime.
// 2. We need a way to filter out these extra responses from reaching the client.
// 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1.
//
// I think the best way forward to achieve this would be to first perform the refactors listed in
// https://github.com/shotover/shotover-proxy/issues/1844
}
}

// Sending the use statement to these connections to keep them alive instead is possible but tricky.
// 1. The destinations need to be calculated here, at sending time, to ensure no new connections have been created in the meantime.
// 2. We need a way to filter out these extra responses from reaching the client.
// 3. But we cant use the TableRewrite abstraction since that occurs too early. See 1.
//
// It might be worth doing in the future.
} else if is_prepare_message(&mut message) {
let next_host_id = self.message_rewriter.get_destination_for_prepare(&message);
match self
Expand Down

0 comments on commit 2137431

Please sign in to comment.