Skip to content

Commit

Permalink
Full gRPC support when running on .NET Framework on Windows 11 (#387)
Browse files Browse the repository at this point in the history
* Create GrpcAdapterProxy.netfx.cs

Adds .NET Framework-only static methods to `GrpcAdapterProxy` for checking if the host OS is Windows 11 or later.

When Windows 11 is being used, gRPC clients using WinHTTP can call both client and bidirectional streaming gRPC methods.

* Use bidirectional streaming where supported

Backport bidirectional streaming from netstandard2.1 to net48 when running on Windows 11 or greater. The existing net48 implementation is still used when a different version of Windows is used.

https://learn.microsoft.com/en-us/aspnet/core/grpc/netstandard#net-framework
  • Loading branch information
wazzamatazz authored Apr 15, 2024
1 parent 4eeddae commit 486c2f7
Show file tree
Hide file tree
Showing 17 changed files with 481 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,69 @@ internal partial class EventMessagePushWithTopicsImpl : ProxyAdapterFeature, IEv
/// </param>
public EventMessagePushWithTopicsImpl(GrpcAdapterProxy proxy) : base(proxy) { }


private async IAsyncEnumerable<Adapter.Events.EventMessage> SubscribeCoreAsync(
IAdapterCallContext context,
CreateEventMessageTopicSubscriptionRequest request,
IAsyncEnumerable<EventMessageSubscriptionUpdate> channel,
[EnumeratorCancellation]
CancellationToken cancellationToken
) {
var client = CreateClient<EventsService.EventsServiceClient>();

var createSubscriptionMessage = new CreateEventTopicPushChannelMessage() {
AdapterId = AdapterId,
SubscriptionType = request.SubscriptionType == EventMessageSubscriptionType.Active
? EventSubscriptionType.Active
: EventSubscriptionType.Passive
};
createSubscriptionMessage.Topics.Add(request.Topics?.Where(x => x != null) ?? Array.Empty<string>());
if (request.Properties != null) {
foreach (var item in request.Properties) {
createSubscriptionMessage.Properties.Add(item.Key, item.Value ?? string.Empty);
}
}

using (var grpcStream = client.CreateEventTopicPushChannel(GetCallOptions(context, cancellationToken))) {

// Create the subscription.
await grpcStream.RequestStream.WriteAsync(new CreateEventTopicPushChannelRequest() {
Create = createSubscriptionMessage
}).ConfigureAwait(false);

// Stream subscription changes.
Proxy.BackgroundTaskService.QueueBackgroundWorkItem(async ct => {
await foreach (var update in channel.WithCancellation(ct).ConfigureAwait(false)) {
if (update == null) {
continue;
}

var msg = new UpdateEventTopicPushChannelMessage() {
Action = update.Action == Common.SubscriptionUpdateAction.Subscribe
? SubscriptionUpdateAction.Subscribe
: SubscriptionUpdateAction.Unsubscribe
};
msg.Topics.Add(update.Topics.Where(x => x != null));
if (msg.Topics.Count == 0) {
continue;
}

await grpcStream.RequestStream.WriteAsync(new CreateEventTopicPushChannelRequest() {
Update = msg
}).ConfigureAwait(false);
}
}, cancellationToken);

// Stream the results.
while (await grpcStream.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false)) {
if (grpcStream.ResponseStream.Current == null) {
continue;
}

yield return grpcStream.ResponseStream.Current.ToAdapterEventMessage();
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ IEnumerable<string> topics
[EnumeratorCancellation]
CancellationToken cancellationToken
) {
if (GrpcAdapterProxy.IsGrpcClientFullySupported()) {
// Bidirectional streaming is fully supported.
await foreach (var item in SubscribeCoreAsync(context, request, channel, cancellationToken).ConfigureAwait(false)) {
yield return item;
}
yield break;
}

// Bidirectional streaming is not supported.

var client = CreateClient<EventsService.EventsServiceClient>();

using var handler = CreateInnerHandler(context, client, request, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,78 +5,22 @@
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

using DataCore.Adapter.Events;

using IntelligentPlant.BackgroundTasks;

namespace DataCore.Adapter.Grpc.Proxy.Events {

partial class EventMessagePushWithTopicsImpl {

/// <inheritdoc/>
public async IAsyncEnumerable<Adapter.Events.EventMessage> Subscribe(
public IAsyncEnumerable<Adapter.Events.EventMessage> Subscribe(
IAdapterCallContext context,
CreateEventMessageTopicSubscriptionRequest request,
IAsyncEnumerable<EventMessageSubscriptionUpdate> channel,
[EnumeratorCancellation]
CancellationToken cancellationToken
) {
var client = CreateClient<EventsService.EventsServiceClient>();

var createSubscriptionMessage = new CreateEventTopicPushChannelMessage() {
AdapterId = AdapterId,
SubscriptionType = request.SubscriptionType == EventMessageSubscriptionType.Active
? EventSubscriptionType.Active
: EventSubscriptionType.Passive
};
createSubscriptionMessage.Topics.Add(request.Topics?.Where(x => x != null) ?? Array.Empty<string>());
if (request.Properties != null) {
foreach (var item in request.Properties) {
createSubscriptionMessage.Properties.Add(item.Key, item.Value ?? string.Empty);
}
}

using (var grpcStream = client.CreateEventTopicPushChannel(GetCallOptions(context, cancellationToken))) {

// Create the subscription.
await grpcStream.RequestStream.WriteAsync(new CreateEventTopicPushChannelRequest() {
Create = createSubscriptionMessage
}).ConfigureAwait(false);

// Stream subscription changes.
Proxy.BackgroundTaskService.QueueBackgroundWorkItem(async ct => {
await foreach (var update in channel.WithCancellation(ct).ConfigureAwait(false)) {
if (update == null) {
continue;
}

var msg = new UpdateEventTopicPushChannelMessage() {
Action = update.Action == Common.SubscriptionUpdateAction.Subscribe
? SubscriptionUpdateAction.Subscribe
: SubscriptionUpdateAction.Unsubscribe
};
msg.Topics.Add(update.Topics.Where(x => x != null));
if (msg.Topics.Count == 0) {
continue;
}

await grpcStream.RequestStream.WriteAsync(new CreateEventTopicPushChannelRequest() {
Update = msg
}).ConfigureAwait(false);
}
}, cancellationToken);

// Stream the results.
while (await grpcStream.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false)) {
if (grpcStream.ResponseStream.Current == null) {
continue;
}

yield return grpcStream.ResponseStream.Current.ToAdapterEventMessage();
}
}
return SubscribeCoreAsync(context, request, channel, cancellationToken);
}

}
Expand Down
56 changes: 55 additions & 1 deletion src/DataCore.Adapter.Grpc.Proxy/Events/WriteEventMessagesImpl.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
using DataCore.Adapter.Events;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

using DataCore.Adapter.Events;

using IntelligentPlant.BackgroundTasks;

namespace DataCore.Adapter.Grpc.Proxy.Events.Features {

Expand All @@ -15,5 +22,52 @@ internal partial class WriteEventMessagesImpl : ProxyAdapterFeature, IWriteEvent
/// </param>
public WriteEventMessagesImpl(GrpcAdapterProxy proxy) : base(proxy) { }


private async IAsyncEnumerable<Adapter.Events.WriteEventMessageResult> WriteEventMessagesCoreAsync(
IAdapterCallContext context,
Adapter.Events.WriteEventMessagesRequest request,
IAsyncEnumerable<Adapter.Events.WriteEventMessageItem> channel,
[EnumeratorCancellation]
CancellationToken cancellationToken
) {
var client = CreateClient<EventsService.EventsServiceClient>();

using (var grpcStream = client.WriteEventMessages(GetCallOptions(context, cancellationToken))) {

// Create the subscription.
var initMessage = new WriteEventMessageInitMessage() {
AdapterId = AdapterId
};

if (request.Properties != null) {
foreach (var prop in request.Properties) {
initMessage.Properties.Add(prop.Key, prop.Value ?? string.Empty);
}
}

await grpcStream.RequestStream.WriteAsync(new WriteEventMessageRequest() {
Init = initMessage
}).ConfigureAwait(false);

// Run a background task to stream the values to write.
Proxy.BackgroundTaskService.QueueBackgroundWorkItem(async ct => {
try {
await foreach (var item in channel.WithCancellation(ct).ConfigureAwait(false)) {
await grpcStream.RequestStream.WriteAsync(new WriteEventMessageRequest() {
Write = item.ToGrpcWriteEventMessageItem()
}).ConfigureAwait(false);
}
}
finally {
await grpcStream.RequestStream.CompleteAsync().ConfigureAwait(false);
}
}, cancellationToken);

while (await grpcStream.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false)) {
yield return grpcStream.ResponseStream.Current.ToAdapterWriteEventMessageResult();
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ partial class WriteEventMessagesImpl {
[EnumeratorCancellation]
CancellationToken cancellationToken
) {
if (GrpcAdapterProxy.IsGrpcClientFullySupported()) {
// Bidrectional streaming is fully supported.
await foreach (var item in WriteEventMessagesCoreAsync(context, request, channel, cancellationToken).ConfigureAwait(false)) {
yield return item;
}
yield break;
}

// Bidirectional streaming is not supported.

var client = CreateClient<EventsService.EventsServiceClient>();

var callOptions = GetCallOptions(context, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,13 @@ namespace DataCore.Adapter.Grpc.Proxy.Events.Features {
partial class WriteEventMessagesImpl {

/// <inheritdoc/>
public async IAsyncEnumerable<Adapter.Events.WriteEventMessageResult> WriteEventMessages(
public IAsyncEnumerable<Adapter.Events.WriteEventMessageResult> WriteEventMessages(
IAdapterCallContext context,
Adapter.Events.WriteEventMessagesRequest request,
IAsyncEnumerable<Adapter.Events.WriteEventMessageItem> channel,
[EnumeratorCancellation]
CancellationToken cancellationToken
) {
var client = CreateClient<EventsService.EventsServiceClient>();

using (var grpcStream = client.WriteEventMessages(GetCallOptions(context, cancellationToken))) {

// Create the subscription.
var initMessage = new WriteEventMessageInitMessage() {
AdapterId = AdapterId
};

if (request.Properties != null) {
foreach (var prop in request.Properties) {
initMessage.Properties.Add(prop.Key, prop.Value ?? string.Empty);
}
}

await grpcStream.RequestStream.WriteAsync(new WriteEventMessageRequest() {
Init = initMessage
}).ConfigureAwait(false);

// Run a background task to stream the values to write.
Proxy.BackgroundTaskService.QueueBackgroundWorkItem(async ct => {
try {
await foreach (var item in channel.WithCancellation(ct).ConfigureAwait(false)) {
await grpcStream.RequestStream.WriteAsync(new WriteEventMessageRequest() {
Write = item.ToGrpcWriteEventMessageItem()
}).ConfigureAwait(false);
}
}
finally {
await grpcStream.RequestStream.CompleteAsync().ConfigureAwait(false);
}
}, cancellationToken);

while (await grpcStream.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false)) {
yield return grpcStream.ResponseStream.Current.ToAdapterWriteEventMessageResult();
}
}
return WriteEventMessagesCoreAsync(context, request, channel, cancellationToken);
}

}
Expand Down
Loading

0 comments on commit 486c2f7

Please sign in to comment.