diff --git a/src/DataCore.Adapter.Abstractions/PublicAPI.Unshipped.txt b/src/DataCore.Adapter.Abstractions/PublicAPI.Unshipped.txt index eb31d1dc..5ba20ad8 100644 --- a/src/DataCore.Adapter.Abstractions/PublicAPI.Unshipped.txt +++ b/src/DataCore.Adapter.Abstractions/PublicAPI.Unshipped.txt @@ -101,6 +101,10 @@ override sealed DataCore.Adapter.Services.KeyValueStore.GetCompression override sealed DataCore.Adapter.Services.KeyValueStore.GetSerializer() -> DataCore.Adapter.Services.IKeyValueStoreSerializer! static DataCore.Adapter.AdapterExtensions.CreateExtendedAdapterDescriptorBuilder(this DataCore.Adapter.IAdapter! adapter) -> DataCore.Adapter.Common.AdapterDescriptorBuilder! static DataCore.Adapter.Services.JsonKeyValueStoreSerializer.Default.get -> DataCore.Adapter.Services.IKeyValueStoreSerializer! +static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyFromAsync(this DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.KVKey? keyPrefix = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyToAsync(this DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.KVKey? keyPrefix = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static DataCore.Adapter.Services.KeyValueStoreExtensions.CopyFromAsync(this DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.IRawKeyValueStore! source, System.Collections.Generic.IEnumerable! keys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static DataCore.Adapter.Services.KeyValueStoreExtensions.CopyToAsync(this DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.IRawKeyValueStore! destination, System.Collections.Generic.IEnumerable! keys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static DataCore.Adapter.Services.KeyValueStoreExtensions.GetKeysAsStringsAsync(this DataCore.Adapter.Services.IKeyValueStore! store) -> System.Collections.Generic.IAsyncEnumerable! static DataCore.Adapter.Services.KeyValueStoreExtensions.GetKeysAsStringsAsync(this DataCore.Adapter.Services.IKeyValueStore! store, DataCore.Adapter.Services.KVKey? prefix) -> System.Collections.Generic.IAsyncEnumerable! static DataCore.Adapter.Services.KVKey.implicit operator string?(DataCore.Adapter.Services.KVKey value) -> string? diff --git a/src/DataCore.Adapter.Abstractions/Services/KeyValueStoreExtensions.cs b/src/DataCore.Adapter.Abstractions/Services/KeyValueStoreExtensions.cs index 3ed973f8..7ab882c4 100644 --- a/src/DataCore.Adapter.Abstractions/Services/KeyValueStoreExtensions.cs +++ b/src/DataCore.Adapter.Abstractions/Services/KeyValueStoreExtensions.cs @@ -3,9 +3,11 @@ using System.Text; using System.Text.Json; using System.Text.Json.Serialization.Metadata; +using System.Threading; using System.Threading.Tasks; namespace DataCore.Adapter.Services { + /// /// Extensions for . /// @@ -303,5 +305,174 @@ public static async ValueTask WriteJsonAsync(this IKeyValueStore store, return await store.ReadAsync(key).ConfigureAwait(false); } + + /// + /// Copies all keys and values matching the specified prefix to another . + /// + /// + /// The source . + /// + /// + /// The destination . + /// + /// + /// The filter to apply to keys read from the source store. + /// + /// + /// The cancellation token for the operation. + /// + /// + /// The number of keys copied. + /// + /// + /// is . + /// + /// + /// is . + /// + public static async Task BulkCopyToAsync(this IRawKeyValueStore source, IRawKeyValueStore destination, KVKey? keyPrefix = null, CancellationToken cancellationToken = default) { + if (source == null) { + throw new ArgumentNullException(nameof(source)); + } + if (destination == null) { + throw new ArgumentNullException(nameof(destination)); + } + + var count = 0; + + await foreach (var key in source.GetKeysAsync(keyPrefix).ConfigureAwait(false)) { + cancellationToken.ThrowIfCancellationRequested(); + + var value = await source.ReadRawAsync(key).ConfigureAwait(false); + if (value == null) { + continue; + } + + await destination.WriteRawAsync(key, value).ConfigureAwait(false); + count++; + } + + return count; + } + + + /// + /// Copies all keys and values matching the specified prefix from another . + /// + /// + /// The destination . + /// + /// + /// The source . + /// + /// + /// The filter to apply to keys read from the source store. + /// + /// + /// The cancellation token for the operation. + /// + /// + /// The number of keys copied. + /// + /// + /// is . + /// + /// + /// is . + /// + public static async Task BulkCopyFromAsync(this IRawKeyValueStore destination, IRawKeyValueStore source, KVKey? keyPrefix = null, CancellationToken cancellationToken = default) { + return await source.BulkCopyToAsync(destination, keyPrefix, cancellationToken).ConfigureAwait(false); + } + + + /// + /// Copies the specified keys and values to another . + /// + /// + /// The source . + /// + /// + /// The destination . + /// + /// + /// The keys to copy. + /// + /// + /// The cancellation token for the operation. + /// + /// + /// The number of keys copied. + /// + /// + /// is . + /// + /// + /// is . + /// + /// + /// is . + /// + public static async Task CopyToAsync(this IRawKeyValueStore source, IRawKeyValueStore destination, IEnumerable keys, CancellationToken cancellationToken = default) { + if (source == null) { + throw new ArgumentNullException(nameof(source)); + } + if (destination == null) { + throw new ArgumentNullException(nameof(destination)); + } + + var count = 0; + + foreach (var key in keys) { + cancellationToken.ThrowIfCancellationRequested(); + + if (key.Length == 0) { + continue; + } + + var value = await source.ReadRawAsync(key).ConfigureAwait(false); + if (value == null) { + continue; + } + + await destination.WriteRawAsync(key, value).ConfigureAwait(false); + count++; + } + + return count; + } + + + /// + /// Copies the specified keys and values from another . + /// + /// + /// The destination . + /// + /// + /// The source . + /// + /// + /// The keys to copy. + /// + /// + /// The cancellation token for the operation. + /// + /// + /// The number of keys copied. + /// + /// + /// is . + /// + /// + /// is . + /// + /// + /// is . + /// + public static async Task CopyFromAsync(this IRawKeyValueStore destination, IRawKeyValueStore source, IEnumerable keys, CancellationToken cancellationToken = default) { + return await source.CopyToAsync(destination, keys, cancellationToken).ConfigureAwait(false); + } + } + } diff --git a/test/DataCore.Adapter.Tests/KeyValueStoreTests.cs b/test/DataCore.Adapter.Tests/KeyValueStoreTests.cs index fe9c235c..9bb3434a 100644 --- a/test/DataCore.Adapter.Tests/KeyValueStoreTests.cs +++ b/test/DataCore.Adapter.Tests/KeyValueStoreTests.cs @@ -336,6 +336,128 @@ public async Task RawWriteShouldThrowException() { } } + + [TestMethod] + public async Task ShouldCopyAllKeysToAnotherStore() { + var now = DateTime.UtcNow; + + var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore; + var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore; + + if (store1 == null || store2 == null) { + Assert.Inconclusive("Source or destination store does not support raw writes"); + } + + await store1.WriteAsync(TestContext.TestName, now); + var count = await store1.BulkCopyToAsync(store2); + Assert.AreEqual(1, count); + + var readResult = await store2.ReadAsync(TestContext.TestName); + Assert.AreEqual(now, readResult); + } + + + [TestMethod] + public async Task ShouldCopyAllKeysFromAnotherStore() { + var now = DateTime.UtcNow; + + var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore; + var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore; + + if (store1 == null || store2 == null) { + Assert.Inconclusive("Source or destination store does not support raw writes"); + } + + await store2.WriteAsync(TestContext.TestName, now); + var count = await store1.BulkCopyFromAsync(store2); + Assert.AreEqual(1, count); + + var readResult = await store1.ReadAsync(TestContext.TestName); + Assert.AreEqual(now, readResult); + } + + + [TestMethod] + public async Task ShouldCopyFilteredKeysToAnotherStore() { + var now = DateTime.UtcNow; + + var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore; + var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore; + + if (store1 == null || store2 == null) { + Assert.Inconclusive("Source or destination store does not support raw writes"); + } + + await store1.WriteAsync(TestContext.TestName + ":1:Value", now); + await store1.WriteAsync(TestContext.TestName + ":2:Value", now); + var count = await store1.BulkCopyToAsync(store2, TestContext.TestName + ":1"); + Assert.AreEqual(1, count); + + var readResult = await store2.ReadAsync(TestContext.TestName + ":1:Value"); + Assert.AreEqual(now, readResult); + } + + + [TestMethod] + public async Task ShouldCopyFilteredKeysFromAnotherStore() { + var now = DateTime.UtcNow; + + var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore; + var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore; + + if (store1 == null || store2 == null) { + Assert.Inconclusive("Source or destination store does not support raw writes"); + } + + await store2.WriteAsync(TestContext.TestName + ":1:Value", now); + await store2.WriteAsync(TestContext.TestName + ":2:Value", now); + var count = await store1.BulkCopyFromAsync(store2, TestContext.TestName + ":1"); + Assert.AreEqual(1, count); + + var readResult = await store1.ReadAsync(TestContext.TestName + ":1:Value"); + Assert.AreEqual(now, readResult); + } + + + [TestMethod] + public async Task ShouldCopySpecifiedKeysToAnotherStore() { + var now = DateTime.UtcNow; + + var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore; + var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore; + + if (store1 == null || store2 == null) { + Assert.Inconclusive("Source or destination store does not support raw writes"); + } + + await store1.WriteAsync(TestContext.TestName, now); + var count = await store1.CopyToAsync(store2, new KVKey[] { TestContext.TestName }); + Assert.AreEqual(1, count); + + var readResult = await store2.ReadAsync(TestContext.TestName); + Assert.AreEqual(now, readResult); + } + + + [TestMethod] + public async Task ShouldCopySpecifiedKeysFromAnotherStore() { + var now = DateTime.UtcNow; + + var store1 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: true) as IRawKeyValueStore; + var store2 = CreateStore(CompressionLevel.NoCompression, enableRawWrites: false) as IRawKeyValueStore; + + if (store1 == null || store2 == null) { + Assert.Inconclusive("Source or destination store does not support raw writes"); + } + + await store2.WriteAsync(TestContext.TestName, now); + var count = await store1.CopyFromAsync(store2, new KVKey[] { TestContext.TestName }); + Assert.AreEqual(1, count); + + var readResult = await store1.ReadAsync(TestContext.TestName); + Assert.AreEqual(now, readResult); + } + } }