From d588ecedc78b8cd0e2bd7e15f80bb635a53e4f2a Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Fri, 16 Aug 2024 12:19:10 +0200 Subject: [PATCH] improve server contention, fix a deadlock in binary channel on cleanup --- .../Opc.Ua.Server/Session/SessionManager.cs | 155 ++++++++---------- .../Stack/Tcp/TcpTransportListener.cs | 70 ++++---- .../Stack/Tcp/UaSCBinaryChannel.cs | 10 +- Stack/Opc.Ua.Core/Types/BuiltIn/NodeId.cs | 41 ++++- .../Types/BuiltIn/NodeIdDictionary.cs | 33 ++++ 5 files changed, 177 insertions(+), 132 deletions(-) diff --git a/Libraries/Opc.Ua.Server/Session/SessionManager.cs b/Libraries/Opc.Ua.Server/Session/SessionManager.cs index ff40d2718..874244209 100644 --- a/Libraries/Opc.Ua.Server/Session/SessionManager.cs +++ b/Libraries/Opc.Ua.Server/Session/SessionManager.cs @@ -33,6 +33,7 @@ using System.Security.Cryptography.X509Certificates; using System.Globalization; using System.Threading.Tasks; +using System.Collections.Concurrent; namespace Opc.Ua.Server { @@ -62,7 +63,7 @@ public SessionManager( m_maxHistoryContinuationPoints = configuration.ServerConfiguration.MaxHistoryContinuationPoints; m_minNonceLength = configuration.SecurityConfiguration.NonceLength; - m_sessions = new Dictionary(); + m_sessions = new ConcurrentDictionary(Environment.ProcessorCount, m_maxSessionCount); m_lastSessionId = BitConverter.ToInt64(Utils.Nonce.CreateNonce(sizeof(long)), 0); // create a event to signal shutdown. @@ -87,17 +88,13 @@ protected virtual void Dispose(bool disposing) { if (disposing) { - List sessions = null; - - lock (m_lock) - { - sessions = new List(m_sessions.Values); - m_sessions.Clear(); - } + // create snapshot of all sessions + var sessions = m_sessions.ToArray(); + m_sessions.Clear(); - foreach (Session session in sessions) + foreach (var sessionKeyValue in sessions) { - Utils.SilentDispose(session); + Utils.SilentDispose(sessionKeyValue.Value); } m_shutdownEvent.Set(); @@ -127,18 +124,16 @@ public virtual void Startup() /// public virtual void Shutdown() { - lock (m_lock) - { - // stop the monitoring thread. - m_shutdownEvent.Set(); + // stop the monitoring thread. + m_shutdownEvent.Set(); - // dispose of session objects. - foreach (Session session in m_sessions.Values) - { - session.Dispose(); - } + // dispose of session objects using a snapshot. + var sessions = m_sessions.ToArray(); + m_sessions.Clear(); - m_sessions.Clear(); + foreach (var sessionKeyValue in sessions) + { + Utils.SilentDispose(sessionKeyValue.Value); } } @@ -176,9 +171,11 @@ public virtual Session CreateSession( // check for same Nonce in another session if (clientNonce != null) { - foreach (Session sessionIterator in m_sessions.Values) + // iterate over key/value pairs in the dictionary with a thread safe iterator + foreach (var sessionKeyValueIterator in m_sessions) { - if (Utils.CompareNonce(sessionIterator.ClientNonce, clientNonce)) + byte[] sessionClientNonce = sessionKeyValueIterator.Value?.ClientNonce; + if (Utils.CompareNonce(sessionClientNonce, clientNonce)) { throw new ServiceResultException(StatusCodes.BadNonceInvalid); } @@ -191,7 +188,7 @@ public virtual Session CreateSession( { if (context.ChannelContext.EndpointDescription.SecurityMode != MessageSecurityMode.None) { - authenticationToken = Utils.IncrementIdentifier(ref m_lastSessionId); + authenticationToken = new NodeId(Utils.IncrementIdentifier(ref m_lastSessionId)); } } @@ -243,7 +240,10 @@ public virtual Session CreateSession( sessionId = session.Id; // save session. - m_sessions.Add(authenticationToken, session); + if (!m_sessions.TryAdd(authenticationToken, session)) + { + throw new ServiceResultException(StatusCodes.BadTooManySessions); + } } // raise session related event. @@ -272,6 +272,12 @@ public virtual bool ActivateSession( UserIdentityToken newIdentity = null; UserTokenPolicy userTokenPolicy = null; + // fast path no lock + if (!m_sessions.TryGetValue(authenticationToken, out _)) + { + throw new ServiceResultException(StatusCodes.BadSessionIdInvalid); + } + lock (m_lock) { // find session. @@ -306,7 +312,6 @@ public virtual bool ActivateSession( out newIdentity, out userTokenPolicy); } - IUserIdentity identity = null; IUserIdentity effectiveIdentity = null; ServiceResult error = null; @@ -393,23 +398,8 @@ public virtual bool ActivateSession( /// public virtual void CloseSession(NodeId sessionId) { - // find the session. - Session session = null; - - lock (m_lock) - { - foreach (KeyValuePair current in m_sessions) - { - if (current.Value.Id == sessionId) - { - session = current.Value; - m_sessions.Remove(current.Key); - break; - } - } - } - - if (session != null) + // find the session and try to remove it. + if (m_sessions.TryRemove(sessionId, out Session session) && session != null) { // raise session related event. RaiseSessionEvent(session, SessionEventReason.Closing); @@ -423,7 +413,6 @@ public virtual void CloseSession(NodeId sessionId) m_server.ServerDiagnostics.CurrentSessionCount--; } } - } /// @@ -442,44 +431,41 @@ public virtual OperationContext ValidateRequest(RequestHeader requestHeader, Req try { - lock (m_lock) + // check for create session request. + if (requestType == RequestType.CreateSession || requestType == RequestType.ActivateSession) { - // check for create session request. - if (requestType == RequestType.CreateSession || requestType == RequestType.ActivateSession) - { - return new OperationContext(requestHeader, requestType); - } + return new OperationContext(requestHeader, requestType); + } - // find session. - if (!m_sessions.TryGetValue(requestHeader.AuthenticationToken, out session)) + // find session. + if (!m_sessions.TryGetValue(requestHeader.AuthenticationToken, out session)) + { + EventHandler handler = m_validateSessionLessRequest; + + if (handler != null) { - EventHandler handler = m_validateSessionLessRequest; + var args = new ValidateSessionLessRequestEventArgs(requestHeader.AuthenticationToken, requestType); + handler(this, args); - if (handler != null) + if (ServiceResult.IsBad(args.Error)) { - var args = new ValidateSessionLessRequestEventArgs(requestHeader.AuthenticationToken, requestType); - handler(this, args); - - if (ServiceResult.IsBad(args.Error)) - { - throw new ServiceResultException(args.Error); - } - - return new OperationContext(requestHeader, requestType, args.Identity); + throw new ServiceResultException(args.Error); } - throw new ServiceResultException(StatusCodes.BadSessionIdInvalid); + return new OperationContext(requestHeader, requestType, args.Identity); } - // validate request header. - session.ValidateRequest(requestHeader, requestType); + throw new ServiceResultException(StatusCodes.BadSessionIdInvalid); + } - // validate user has permissions for additional info - session.ValidateDiagnosticInfo(requestHeader); + // validate request header. + session.ValidateRequest(requestHeader, requestType); - // return context. - return new OperationContext(requestHeader, requestType, session); - } + // validate user has permissions for additional info + session.ValidateDiagnosticInfo(requestHeader); + + // return context. + return new OperationContext(requestHeader, requestType, session); } catch (Exception e) { @@ -584,17 +570,10 @@ private void MonitorSessions(object data) do { - Session[] sessions = null; - - lock (m_lock) + // enumerator is thread safe + foreach (var sessionKeyValue in m_sessions) { - sessions = new Session[m_sessions.Count]; - m_sessions.Values.CopyTo(sessions, 0); - } - - for (int ii = 0; ii < sessions.Length; ii++) - { - var session = sessions[ii]; + Session session = sessionKeyValue.Value; if (session.HasExpired) { // update diagnostics. @@ -604,9 +583,9 @@ private void MonitorSessions(object data) } // raise audit event for session closed because of timeout - m_server.ReportAuditCloseSessionEvent(null, sessions[ii], "Session/Timeout"); + m_server.ReportAuditCloseSessionEvent(null, session, "Session/Timeout"); - m_server.CloseSession(null, sessions[ii].Id, false); + m_server.CloseSession(null, session.Id, false); } // if a session had no activity for the last m_minSessionTimeout milliseconds, send a keep alive event. else if (session.ClientLastContactTime.AddMilliseconds(m_minSessionTimeout) < DateTime.UtcNow) @@ -634,7 +613,7 @@ private void MonitorSessions(object data) #region Private Fields private readonly object m_lock = new object(); private IServerInternal m_server; - private Dictionary m_sessions; + private ConcurrentDictionary m_sessions; private long m_lastSessionId; private ManualResetEvent m_shutdownEvent; @@ -789,14 +768,12 @@ public IList GetSessions() /// public Session GetSession(NodeId authenticationToken) { - - Session session = null; - lock (m_lock) + // find session. + if (m_sessions.TryGetValue(authenticationToken, out Session session)) { - // find session. - m_sessions.TryGetValue(authenticationToken, out session); + return session; } - return session; + return null; } #endregion } diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/TcpTransportListener.cs b/Stack/Opc.Ua.Core/Stack/Tcp/TcpTransportListener.cs index 5292d2f38..e7b54136e 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/TcpTransportListener.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/TcpTransportListener.cs @@ -84,11 +84,13 @@ protected virtual void Dispose(bool disposing) if (m_channels != null) { - foreach (var channel in m_channels.Values) + var channels = m_channels.ToArray(); + m_channels.Clear(); + m_channels = null; + foreach (var channelKeyValue in channels) { - Utils.SilentDispose(channel); + Utils.SilentDispose(channelKeyValue.Value); } - m_channels = null; } } } @@ -157,7 +159,7 @@ public void Open( m_serverCertificateChain = settings.ServerCertificateChain; m_bufferManager = new BufferManager("Server", m_quotas.MaxBufferSize); - m_channels = new Dictionary(); + m_channels = new ConcurrentDictionary(); m_reverseConnectListener = settings.ReverseConnectListener; m_maxChannelCount = settings.MaxChannelCount; @@ -239,12 +241,14 @@ public bool ReconnectToExistingChannel( /// public void ChannelClosed(uint channelId) { - lock (m_lock) + if (m_channels?.TryRemove(channelId, out _) == true) { - m_channels?.Remove(channelId); + Utils.LogInfo("ChannelId {0}: closed", channelId); + } + else + { + Utils.LogInfo("ChannelId {0}: closed channel not found", channelId); } - - Utils.LogInfo("ChannelId {0}: closed", channelId); } /// @@ -291,9 +295,9 @@ private void OnReverseHelloComplete(IAsyncResult result) { channel.EndReverseConnect(result); - lock (m_lock) + if (!m_channels.TryAdd(channel.Id, channel)) { - m_channels.Add(channel.Id, channel); + throw new ServiceResultException(StatusCodes.BadInternalError); } if (m_callback != null) @@ -444,12 +448,11 @@ public async Task TransferListenerChannel( { bool accepted = false; TcpListenerChannel channel = null; - lock (m_lock) + + // remove it so it does not get cleaned up as an inactive connection. + if (m_channels?.TryRemove(channelId, out channel) != true) { - if (m_channels?.TryGetValue(channelId, out channel) != true) - { - throw ServiceResultException.Create(StatusCodes.BadTcpSecureChannelUnknown, "Could not find secure channel request."); - } + throw ServiceResultException.Create(StatusCodes.BadTcpSecureChannelUnknown, "Could not find secure channel request."); } // notify the application. @@ -460,13 +463,10 @@ public async Task TransferListenerChannel( accepted = args.Accepted; } - if (accepted) + if (!accepted) { - lock (m_lock) - { - // remove it so it does not get cleaned up as an inactive connection. - m_channels?.Remove(channelId); - } + // add back in for other connection attempt. + m_channels?.TryAdd(channelId, channel); } return accepted; @@ -578,7 +578,7 @@ private void OnAccept(object sender, SocketAsyncEventArgs e) channel.Attach(channelId, e.AcceptSocket); // save the channel for shutdown and reconnects. - m_channels.Add(channelId, channel); + m_channels.TryAdd(channelId, channel); } catch (Exception ex) { @@ -618,15 +618,20 @@ private void DetectInactiveChannels(object state = null) { List channels; - lock (m_lock) + channels = new List(); + foreach (var chEntry in m_channels) { - channels = new List(); - foreach (var chEntry in m_channels) + if (chEntry.Value.ElapsedSinceLastActiveTime > m_quotas.ChannelLifetime) { - if (chEntry.Value.ElapsedSinceLastActiveTime > m_quotas.ChannelLifetime) - { - channels.Add(chEntry.Value); - } + channels.Add(chEntry.Value); + } + } + + // foreach (var channel in channels) + { + // if (!m_channels.TryRemove(channel.Id, out _)) + { + // Utils.LogError("TCPLISTENER: Failed to remove channel {0} from active channels.", channel.Id); } } @@ -635,10 +640,7 @@ private void DetectInactiveChannels(object state = null) Utils.LogInfo("TCPLISTENER: {0} channels scheduled for IdleCleanup.", channels.Count); foreach (var channel in channels) { - lock (m_lock) - { - channel.IdleCleanup(); - } + channel.IdleCleanup(); } } } @@ -814,7 +816,7 @@ private void SetUri(Uri baseAddress, string relativeAddress) private uint m_lastChannelId; private Socket m_listeningSocket; private Socket m_listeningSocketIPv6; - private Dictionary m_channels; + private ConcurrentDictionary m_channels; private ITransportListenerCallback m_callback; private bool m_reverseConnectListener; private int m_inactivityDetectPeriod; diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs index ccd5b9845..f69fc55d9 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs @@ -163,10 +163,7 @@ public uint Id { get { - lock (m_lock) - { - return m_channelId; - } + return m_channelId; } } @@ -177,10 +174,7 @@ public string GlobalChannelId { get { - lock (m_lock) - { - return m_globalChannelId; - } + return m_globalChannelId; } } diff --git a/Stack/Opc.Ua.Core/Types/BuiltIn/NodeId.cs b/Stack/Opc.Ua.Core/Types/BuiltIn/NodeId.cs index d18669628..6866b5d71 100644 --- a/Stack/Opc.Ua.Core/Types/BuiltIn/NodeId.cs +++ b/Stack/Opc.Ua.Core/Types/BuiltIn/NodeId.cs @@ -1233,7 +1233,7 @@ public override int GetHashCode() hashCode.AddBytes((byte[])m_identifier); #else byte[] identifier = (byte[])m_identifier; - foreach (var id in identifier) + foreach (byte id in identifier) { hashCode.Add(id); } @@ -1760,4 +1760,43 @@ public virtual object Clone() }//class #endregion + #region NodeIdComparer Class + /// + /// Helper which implements a NodeId IEqualityComparer for Linq queries. + /// + public class NodeIdComparer : IEqualityComparer + { + /// + public bool Equals(NodeId x, NodeId y) + { + if (ReferenceEquals(x, y)) + { + return true; + } + + if (ReferenceEquals(x, null) || ReferenceEquals(y, null)) + { + return false; + } + + if (x == y) + { + return true; + } + + return false; + } + + /// + public int GetHashCode(NodeId nodeId) + { + if (ReferenceEquals(nodeId, null)) + { + return 0; + } + + return nodeId.GetHashCode(); + } + } + #endregion } diff --git a/Stack/Opc.Ua.Core/Types/BuiltIn/NodeIdDictionary.cs b/Stack/Opc.Ua.Core/Types/BuiltIn/NodeIdDictionary.cs index 9ff302c3b..134122d74 100644 --- a/Stack/Opc.Ua.Core/Types/BuiltIn/NodeIdDictionary.cs +++ b/Stack/Opc.Ua.Core/Types/BuiltIn/NodeIdDictionary.cs @@ -10,12 +10,44 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. */ +// define USE_LEGACY_IMPLEMENTATION to use the original implementation +// #define USE_LEGACY_IMPLEMENTATION + +// benchmarks revealed that the use of a standard Dictionary class +// with efficent hash code implementations is up to 10xfaster than +// the original implementation using multiple SortedDictionary instances + using System; using System.Collections; using System.Collections.Generic; namespace Opc.Ua { +#if !USE_LEGACY_IMPLEMENTATION + /// + /// A dictionary designed to provide efficient lookups for objects identified by a NodeId + /// + public class NodeIdDictionary : Dictionary + { + private static readonly NodeIdComparer s_comparer = new NodeIdComparer(); + + /// + /// Creates an empty dictionary with capacity. + /// + public NodeIdDictionary() : base(s_comparer) + { + } + + /// + /// Creates an empty dictionary with capacity. + /// + public NodeIdDictionary(int capacity) : base(capacity, s_comparer) + { + } + } + +#else // USE_LEGACY_IMPLEMENTATION + /// /// A dictionary designed to provide efficient lookups for objects identified by a NodeId /// @@ -1066,4 +1098,5 @@ private void CheckVersion() private ulong m_version; #endregion } +#endif }