Skip to content

Commit

Permalink
ChannelHandler adds EventLoop I/O thread for asynchronous processing
Browse files Browse the repository at this point in the history
  • Loading branch information
fanliang11 committed Jan 23, 2022
1 parent 6b92962 commit 43fbaab
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,19 @@ public override async Task StartAsync(EndPoint endPoint)
return;
_serverMessageListener = await _messageListenerFactory(endPoint);
_serverMessageListener.Received += async (sender, message) =>
{
await Task.Run(() =>
{
MessageListener.OnReceived(sender, message);
});
{
await MessageListener.OnReceived(sender, message);
};
}

public override async Task StartAsync(string ip,int port)
public override async Task StartAsync(string ip, int port)
{
if (_serverMessageListener != null)
return;
_serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip), port));
_serverMessageListener.Received += async (sender, message) =>
{
await Task.Run(() =>
{
MessageListener.OnReceived(sender, message);
});
await MessageListener.OnReceived(sender, message);
};
}

Expand Down
12 changes: 3 additions & 9 deletions src/Surging.Core/Surging.Core.DNS/DnsServiceHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ public override async Task StartAsync(EndPoint endPoint)
return;
_serverMessageListener = await _messageListenerFactory(endPoint);
_serverMessageListener.Received += async (sender, message) =>
{
await Task.Run(() =>
{
MessageListener.OnReceived(sender, message);
});
{
await MessageListener.OnReceived(sender, message);
};
}

Expand All @@ -58,10 +55,7 @@ public override async Task StartAsync(string ip, int port)
_serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip),53));
_serverMessageListener.Received += async (sender, message) =>
{
await Task.Run(() =>
{
MessageListener.OnReceived(sender, message);
});
await MessageListener.OnReceived(sender, message);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public async Task StartAsync(EndPoint endPoint)
var pipeline = channel.Pipeline;
pipeline.AddLast(new LengthFieldPrepender(4));
pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder));
pipeline.AddLast(new ServerHandler(async (contenxt, message) =>
pipeline.AddLast(workerGroup, "HandlerAdapter", new TransportMessageChannelHandlerAdapter(_transportMessageDecoder));
pipeline.AddLast(workerGroup, "ServerHandler", new ServerHandler(async (contenxt, message) =>
{
var sender = new DotNettyServerMessageSender(_transportMessageEncoder, contenxt);
await OnReceived(sender, message);
Expand Down Expand Up @@ -145,12 +145,9 @@ public ServerHandler(Action<IChannelHandlerContext, TransportMessage> readAction
#region Overrides of ChannelHandlerAdapter

public override void ChannelRead(IChannelHandlerContext context, object message)
{
Task.Run(() =>
{
var transportMessage = (TransportMessage)message;
_readAction(context, transportMessage);
});
{
var transportMessage = (TransportMessage)message;
_readAction(context, transportMessage);
}

public override void ChannelReadComplete(IChannelHandlerContext context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ public DefaultHttpServiceHost(Func<EndPoint, Task<IMessageListener>> messageList
_messageListenerFactory = messageListenerFactory;
_serverMessageListener = httpMessageListener;
_serverMessageListener.Received += async (sender, message) =>
{
await Task.Run(() =>
{
MessageListener.OnReceived(sender, message);
});
{
await MessageListener.OnReceived(sender, message);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ public HttpServiceHost(Func<EndPoint, Task<IMessageListener>> messageListenerFac
_messageListenerFactory = messageListenerFactory;
_serverMessageListener = httpMessageListener;
_serverMessageListener.Received += async (sender, message) =>
{
await Task.Run(async () =>
{
await MessageListener.OnReceived(sender, message);
});
{
await MessageListener.OnReceived(sender, message);
};
}

Expand Down
10 changes: 2 additions & 8 deletions src/Surging.Core/Surging.Core.Protocol.Http/HttpServiceHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ public override async Task StartAsync(EndPoint endPoint)
_serverMessageListener = await _messageListenerFactory(endPoint);
_serverMessageListener.Received += async (sender, message) =>
{
await Task.Run(() =>
{
MessageListener.OnReceived(sender, message);
});
await MessageListener.OnReceived(sender, message);
};
}

Expand All @@ -61,10 +58,7 @@ public override async Task StartAsync(string ip,int port)
_serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip), AppConfig.ServerOptions.Ports.HttpPort??0));
_serverMessageListener.Received += async (sender, message) =>
{
await Task.Run(() =>
{
MessageListener.OnReceived(sender, message);
});
await MessageListener.OnReceived(sender, message);
};
}

Expand Down
12 changes: 3 additions & 9 deletions src/Surging.Core/Surging.Core.Protocol.Udp/UdpServiceHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ public override async Task StartAsync(EndPoint endPoint)
_serverMessageListener = await _messageListenerFactory(endPoint);
_serverMessageListener.Received += async (sender, message) =>
{
await Task.Run(() =>
{
MessageListener.OnReceived(sender, message);
});
await MessageListener.OnReceived(sender, message);
};
}

Expand All @@ -57,11 +54,8 @@ public override async Task StartAsync(string ip, int port)
return;
_serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip), AppConfig.ServerOptions.Ports.UdpPort));
_serverMessageListener.Received += async (sender, message) =>
{
await Task.Run(() =>
{
MessageListener.OnReceived(sender, message);
});
{
await MessageListener.OnReceived(sender, message);
};
}

Expand Down

1 comment on commit 43fbaab

@fanliang11
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.