Skip to content

Commit

Permalink
Add extensions for bulk IRawKeyValueStore copying (#365)
Browse files Browse the repository at this point in the history
Adds extension methods for copying keys from one `IRawKeyValueStore` to another, either by specifying a list of keys to copy or a key prefix.
  • Loading branch information
wazzamatazz authored Nov 21, 2023
1 parent 73e0714 commit 101bfc1
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/DataCore.Adapter.Abstractions/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ override sealed DataCore.Adapter.Services.KeyValueStore<TOptions>.GetCompression
override sealed DataCore.Adapter.Services.KeyValueStore<TOptions>.GetSerializer() -> DataCore.Adapter.Services.IKeyValueStoreSerializer!
static DataCore.Adapter.AdapterExtensions.CreateExtendedAdapterDescriptorBuilder(this DataCore.Adapter.IAdapter! adapter) -> DataCore.Adapter.Common.AdapterDescriptorBuilder!
static DataCore.Adapter.Services.JsonKeyValueStoreSerializer.Default.get -> DataCore.Adapter.Services.IKeyValueStoreSerializer!
static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyFromAsync(this DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.KVKey? keyPrefix = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<int>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyToAsync(this DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.KVKey? keyPrefix = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<int>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.CopyFromAsync(this DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.IRawKeyValueStore! source, System.Collections.Generic.IEnumerable<DataCore.Adapter.Services.KVKey>! keys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<int>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.CopyToAsync(this DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.IRawKeyValueStore! destination, System.Collections.Generic.IEnumerable<DataCore.Adapter.Services.KVKey>! keys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<int>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.GetKeysAsStringsAsync(this DataCore.Adapter.Services.IKeyValueStore! store) -> System.Collections.Generic.IAsyncEnumerable<string!>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.GetKeysAsStringsAsync(this DataCore.Adapter.Services.IKeyValueStore! store, DataCore.Adapter.Services.KVKey? prefix) -> System.Collections.Generic.IAsyncEnumerable<string!>!
static DataCore.Adapter.Services.KVKey.implicit operator string?(DataCore.Adapter.Services.KVKey value) -> string?
Expand Down
171 changes: 171 additions & 0 deletions src/DataCore.Adapter.Abstractions/Services/KeyValueStoreExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization.Metadata;
using System.Threading;
using System.Threading.Tasks;

namespace DataCore.Adapter.Services {

/// <summary>
/// Extensions for <see cref="IKeyValueStore"/>.
/// </summary>
Expand Down Expand Up @@ -303,5 +305,174 @@ public static async ValueTask WriteJsonAsync<TValue>(this IKeyValueStore store,
return await store.ReadAsync<TValue>(key).ConfigureAwait(false);
}


/// <summary>
/// Copies all keys and values matching the specified prefix to another <see cref="IRawKeyValueStore"/>.
/// </summary>
/// <param name="source">
/// The source <see cref="IRawKeyValueStore"/>.
/// </param>
/// <param name="destination">
/// The destination <see cref="IRawKeyValueStore"/>.
/// </param>
/// <param name="keyPrefix">
/// The filter to apply to keys read from the source store.
/// </param>
/// <param name="cancellationToken">
/// The cancellation token for the operation.
/// </param>
/// <returns>
/// The number of keys copied.
/// </returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source"/> is <see langword="null"/>.
/// </exception>
/// <exception cref="ArgumentNullException">
/// <paramref name="destination"/> is <see langword="null"/>.
/// </exception>
public static async Task<int> BulkCopyToAsync(this IRawKeyValueStore source, IRawKeyValueStore destination, KVKey? keyPrefix = null, CancellationToken cancellationToken = default) {
if (source == null) {
throw new ArgumentNullException(nameof(source));
}
if (destination == null) {
throw new ArgumentNullException(nameof(destination));
}

var count = 0;

await foreach (var key in source.GetKeysAsync(keyPrefix).ConfigureAwait(false)) {
cancellationToken.ThrowIfCancellationRequested();

var value = await source.ReadRawAsync(key).ConfigureAwait(false);
if (value == null) {
continue;
}

await destination.WriteRawAsync(key, value).ConfigureAwait(false);
count++;
}

return count;
}


/// <summary>
/// Copies all keys and values matching the specified prefix from another <see cref="IRawKeyValueStore"/>.
/// </summary>
/// <param name="destination">
/// The destination <see cref="IRawKeyValueStore"/>.
/// </param>
/// <param name="source">
/// The source <see cref="IRawKeyValueStore"/>.
/// </param>
/// <param name="keyPrefix">
/// The filter to apply to keys read from the source store.
/// </param>
/// <param name="cancellationToken">
/// The cancellation token for the operation.
/// </param>
/// <returns>
/// The number of keys copied.
/// </returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source"/> is <see langword="null"/>.
/// </exception>
/// <exception cref="ArgumentNullException">
/// <paramref name="destination"/> is <see langword="null"/>.
/// </exception>
public static async Task<int> BulkCopyFromAsync(this IRawKeyValueStore destination, IRawKeyValueStore source, KVKey? keyPrefix = null, CancellationToken cancellationToken = default) {
return await source.BulkCopyToAsync(destination, keyPrefix, cancellationToken).ConfigureAwait(false);
}


/// <summary>
/// Copies the specified keys and values to another <see cref="IRawKeyValueStore"/>.
/// </summary>
/// <param name="source">
/// The source <see cref="IRawKeyValueStore"/>.
/// </param>
/// <param name="destination">
/// The destination <see cref="IRawKeyValueStore"/>.
/// </param>
/// <param name="keys">
/// The keys to copy.
/// </param>
/// <param name="cancellationToken">
/// The cancellation token for the operation.
/// </param>
/// <returns>
/// The number of keys copied.
/// </returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source"/> is <see langword="null"/>.
/// </exception>
/// <exception cref="ArgumentNullException">
/// <paramref name="destination"/> is <see langword="null"/>.
/// </exception>
/// <exception cref="ArgumentNullException">
/// <paramref name="keys"/> is <see langword="null"/>.
/// </exception>
public static async Task<int> CopyToAsync(this IRawKeyValueStore source, IRawKeyValueStore destination, IEnumerable<KVKey> keys, CancellationToken cancellationToken = default) {
if (source == null) {
throw new ArgumentNullException(nameof(source));
}
if (destination == null) {
throw new ArgumentNullException(nameof(destination));
}

var count = 0;

foreach (var key in keys) {
cancellationToken.ThrowIfCancellationRequested();

if (key.Length == 0) {
continue;
}

var value = await source.ReadRawAsync(key).ConfigureAwait(false);
if (value == null) {
continue;
}

await destination.WriteRawAsync(key, value).ConfigureAwait(false);
count++;
}

return count;
}


/// <summary>
/// Copies the specified keys and values from another <see cref="IRawKeyValueStore"/>.
/// </summary>
/// <param name="destination">
/// The destination <see cref="IRawKeyValueStore"/>.
/// </param>
/// <param name="source">
/// The source <see cref="IRawKeyValueStore"/>.
/// </param>
/// <param name="keys">
/// The keys to copy.
/// </param>
/// <param name="cancellationToken">
/// The cancellation token for the operation.
/// </param>
/// <returns>
/// The number of keys copied.
/// </returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source"/> is <see langword="null"/>.
/// </exception>
/// <exception cref="ArgumentNullException">
/// <paramref name="destination"/> is <see langword="null"/>.
/// </exception>
/// <exception cref="ArgumentNullException">
/// <paramref name="keys"/> is <see langword="null"/>.
/// </exception>
public static async Task<int> CopyFromAsync(this IRawKeyValueStore destination, IRawKeyValueStore source, IEnumerable<KVKey> keys, CancellationToken cancellationToken = default) {
return await source.CopyToAsync(destination, keys, cancellationToken).ConfigureAwait(false);
}

}

}
122 changes: 122 additions & 0 deletions test/DataCore.Adapter.Tests/KeyValueStoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,128 @@ public async Task RawWriteShouldThrowException() {
}
}


[TestMethod]
public async Task ShouldCopyAllKeysToAnotherStore() {
var now = DateTime.UtcNow;

var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore;
var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore;

if (store1 == null || store2 == null) {
Assert.Inconclusive("Source or destination store does not support raw writes");
}

await store1.WriteAsync(TestContext.TestName, now);
var count = await store1.BulkCopyToAsync(store2);
Assert.AreEqual(1, count);

var readResult = await store2.ReadAsync<DateTime>(TestContext.TestName);
Assert.AreEqual(now, readResult);
}


[TestMethod]
public async Task ShouldCopyAllKeysFromAnotherStore() {
var now = DateTime.UtcNow;

var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore;
var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore;

if (store1 == null || store2 == null) {
Assert.Inconclusive("Source or destination store does not support raw writes");
}

await store2.WriteAsync(TestContext.TestName, now);
var count = await store1.BulkCopyFromAsync(store2);
Assert.AreEqual(1, count);

var readResult = await store1.ReadAsync<DateTime>(TestContext.TestName);
Assert.AreEqual(now, readResult);
}


[TestMethod]
public async Task ShouldCopyFilteredKeysToAnotherStore() {
var now = DateTime.UtcNow;

var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore;
var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore;

if (store1 == null || store2 == null) {
Assert.Inconclusive("Source or destination store does not support raw writes");
}

await store1.WriteAsync(TestContext.TestName + ":1:Value", now);
await store1.WriteAsync(TestContext.TestName + ":2:Value", now);
var count = await store1.BulkCopyToAsync(store2, TestContext.TestName + ":1");
Assert.AreEqual(1, count);

var readResult = await store2.ReadAsync<DateTime>(TestContext.TestName + ":1:Value");
Assert.AreEqual(now, readResult);
}


[TestMethod]
public async Task ShouldCopyFilteredKeysFromAnotherStore() {
var now = DateTime.UtcNow;

var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore;
var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore;

if (store1 == null || store2 == null) {
Assert.Inconclusive("Source or destination store does not support raw writes");
}

await store2.WriteAsync(TestContext.TestName + ":1:Value", now);
await store2.WriteAsync(TestContext.TestName + ":2:Value", now);
var count = await store1.BulkCopyFromAsync(store2, TestContext.TestName + ":1");
Assert.AreEqual(1, count);

var readResult = await store1.ReadAsync<DateTime>(TestContext.TestName + ":1:Value");
Assert.AreEqual(now, readResult);
}


[TestMethod]
public async Task ShouldCopySpecifiedKeysToAnotherStore() {
var now = DateTime.UtcNow;

var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore;
var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore;

if (store1 == null || store2 == null) {
Assert.Inconclusive("Source or destination store does not support raw writes");
}

await store1.WriteAsync(TestContext.TestName, now);
var count = await store1.CopyToAsync(store2, new KVKey[] { TestContext.TestName });
Assert.AreEqual(1, count);

var readResult = await store2.ReadAsync<DateTime>(TestContext.TestName);
Assert.AreEqual(now, readResult);
}


[TestMethod]
public async Task ShouldCopySpecifiedKeysFromAnotherStore() {
var now = DateTime.UtcNow;

var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore;
var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore;

if (store1 == null || store2 == null) {
Assert.Inconclusive("Source or destination store does not support raw writes");
}

await store2.WriteAsync(TestContext.TestName, now);
var count = await store1.CopyFromAsync(store2, new KVKey[] { TestContext.TestName });
Assert.AreEqual(1, count);

var readResult = await store1.ReadAsync<DateTime>(TestContext.TestName);
Assert.AreEqual(now, readResult);
}

}

}

0 comments on commit 101bfc1

Please sign in to comment.