Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock in binary channel on cleanup, reduce contention due to locks on high server load #2714

Merged
merged 12 commits into from
Aug 24, 2024
40 changes: 10 additions & 30 deletions Libraries/Opc.Ua.Server/Diagnostics/CustomNodeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ protected CustomNodeManager2(
}
}

// create the table of monitored items.
// these are items created by clients when they subscribe to data or events.
m_monitoredItems = new Dictionary<uint, IDataChangeMonitoredItem>();

// create the table of monitored nodes.
// these are created by the node manager whenever a client subscribe to an attribute of the node.
m_monitoredNodes = new Dictionary<NodeId, MonitoredNode2>();
Expand Down Expand Up @@ -237,14 +233,6 @@ protected List<NodeState> RootNotifiers
get { return m_rootNotifiers; }
}

/// <summary>
/// Gets the table of monitored items.
/// </summary>
protected Dictionary<uint, IDataChangeMonitoredItem> MonitoredItems
{
get { return m_monitoredItems; }
}

/// <summary>
/// Gets the table of nodes being monitored.
/// </summary>
Expand Down Expand Up @@ -929,7 +917,7 @@ public virtual void DeleteAddressSpace()
/// </remarks>
public virtual object GetManagerHandle(NodeId nodeId)
{
lock (Lock)
lock (m_lock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very deep into this, but there are about 10 places where a call to GetManagerHandle is protected by lock(Lock), one is protected by m_lock, and one (ore two?) are not protected by any lock. Which attribute is it we want to protect by the lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case the read access to the NodeIdDictionary is protected. In a next improvement we could change the underlying dictionary to Concurrentdictionary, then the number of locks which cause contention can be greatly reduced. GetManagerHandle is the first candidate to have the lock removed. We tested without the lock but it could be unsafe if the node dictionary is modified. Lock and m_lock access the same lock here.

{
return GetManagerHandle(m_systemContext, nodeId, null);
}
Expand All @@ -945,20 +933,17 @@ protected virtual NodeHandle GetManagerHandle(ServerSystemContext context, NodeI
return null;
}

if (m_predefinedNodes != null)
{
NodeState node = null;
NodeState node = null;

if (m_predefinedNodes.TryGetValue(nodeId, out node))
{
NodeHandle handle = new NodeHandle();
if (m_predefinedNodes?.TryGetValue(nodeId, out node) == true)
romanett marked this conversation as resolved.
Show resolved Hide resolved
{
NodeHandle handle = new NodeHandle();

handle.NodeId = nodeId;
handle.Node = node;
handle.Validated = true;
handle.NodeId = nodeId;
handle.Node = node;
handle.Validated = true;

return handle;
}
return handle;
}

return null;
Expand Down Expand Up @@ -3706,7 +3691,6 @@ protected virtual ServiceResult CreateMonitoredItem(
monitoredItem = datachangeItem;

// save the monitored item.
m_monitoredItems.Add(monitoredItemId, datachangeItem);
monitoredNode.Add(datachangeItem);

// report change.
Expand Down Expand Up @@ -3809,7 +3793,7 @@ public ServiceResult ValidateEventRolePermissions(IEventMonitoredItem monitoredI
eventTypeId = baseEventState.EventType?.Value;
sourceNodeId = baseEventState.SourceNode?.Value;
}

OperationContext operationContext = new OperationContext(monitoredItem);

// validate the event type id permissions as specified
Expand Down Expand Up @@ -4259,9 +4243,6 @@ protected virtual ServiceResult DeleteMonitoredItem(
}
}

// remove the monitored item.
m_monitoredItems.Remove(monitoredItem.Id);

// report change.
OnMonitoredItemDeleted(context, handle, datachangeItem);

Expand Down Expand Up @@ -4777,7 +4758,6 @@ protected NodeState AddNodeToComponentCache(ISystemContext context, NodeHandle h
private ServerSystemContext m_systemContext;
private string[] m_namespaceUris;
private ushort[] m_namespaceIndexes;
private Dictionary<uint, IDataChangeMonitoredItem> m_monitoredItems;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit I do not fully understand this change. It seems the m_moniteredItems was not used at all, is that correct? Do we know that people who have extended/derived from CustomNodeManager2 did not use it?

I'm also wondering: the method CreateMonitoredItems is implemented in several node managers, with always slightly different implementations. Is this by intention? (CustomNodeManager2, CoreNodeManager and SampleNodeManager)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea is to not enforce specific implementations and how the monitored items handles are implemented in a node manager.

private Dictionary<NodeId, MonitoredNode2> m_monitoredNodes;
private Dictionary<NodeId, CacheEntry> m_componentCache;
private NodeIdDictionary<NodeState> m_predefinedNodes;
Expand Down
151 changes: 72 additions & 79 deletions Libraries/Opc.Ua.Server/Session/SessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
using System.Security.Cryptography.X509Certificates;
using System.Globalization;
using System.Threading.Tasks;
using System.Collections.Concurrent;

namespace Opc.Ua.Server
{
Expand Down Expand Up @@ -62,7 +63,7 @@
m_maxHistoryContinuationPoints = configuration.ServerConfiguration.MaxHistoryContinuationPoints;
m_minNonceLength = configuration.SecurityConfiguration.NonceLength;

m_sessions = new Dictionary<NodeId, Session>();
m_sessions = new ConcurrentDictionary<NodeId, Session>(Environment.ProcessorCount, m_maxSessionCount);
m_lastSessionId = BitConverter.ToInt64(Utils.Nonce.CreateNonce(sizeof(long)), 0);

// create a event to signal shutdown.
Expand All @@ -87,17 +88,13 @@
{
if (disposing)
{
List<Session> sessions = null;

lock (m_lock)
{
sessions = new List<Session>(m_sessions.Values);
m_sessions.Clear();
}
// create snapshot of all sessions
var sessions = m_sessions.ToArray();
m_sessions.Clear();

foreach (Session session in sessions)
foreach (var sessionKeyValue in sessions)
{
Utils.SilentDispose(session);
Utils.SilentDispose(sessionKeyValue.Value);

Check warning on line 97 in Libraries/Opc.Ua.Server/Session/SessionManager.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Server/Session/SessionManager.cs#L97

Added line #L97 was not covered by tests
}

m_shutdownEvent.Set();
Expand Down Expand Up @@ -127,18 +124,16 @@
/// </summary>
public virtual void Shutdown()
{
lock (m_lock)
{
// stop the monitoring thread.
m_shutdownEvent.Set();
// stop the monitoring thread.
m_shutdownEvent.Set();

// dispose of session objects.
foreach (Session session in m_sessions.Values)
{
session.Dispose();
}
// dispose of session objects using a snapshot.
var sessions = m_sessions.ToArray();
m_sessions.Clear();

m_sessions.Clear();
foreach (var sessionKeyValue in sessions)
{
Utils.SilentDispose(sessionKeyValue.Value);
}
}

Expand Down Expand Up @@ -176,9 +171,11 @@
// check for same Nonce in another session
if (clientNonce != null)
{
foreach (Session sessionIterator in m_sessions.Values)
// iterate over key/value pairs in the dictionary with a thread safe iterator
foreach (var sessionKeyValueIterator in m_sessions)
{
if (Utils.CompareNonce(sessionIterator.ClientNonce, clientNonce))
byte[] sessionClientNonce = sessionKeyValueIterator.Value?.ClientNonce;
if (Utils.CompareNonce(sessionClientNonce, clientNonce))
{
throw new ServiceResultException(StatusCodes.BadNonceInvalid);
}
Expand All @@ -191,7 +188,7 @@
{
if (context.ChannelContext.EndpointDescription.SecurityMode != MessageSecurityMode.None)
{
authenticationToken = Utils.IncrementIdentifier(ref m_lastSessionId);
authenticationToken = new NodeId(Utils.IncrementIdentifier(ref m_lastSessionId));
}
}

Expand Down Expand Up @@ -243,7 +240,10 @@
sessionId = session.Id;

// save session.
m_sessions.Add(authenticationToken, session);
if (!m_sessions.TryAdd(authenticationToken, session))
{
throw new ServiceResultException(StatusCodes.BadTooManySessions);

Check warning on line 245 in Libraries/Opc.Ua.Server/Session/SessionManager.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Server/Session/SessionManager.cs#L245

Added line #L245 was not covered by tests
}
}

// raise session related event.
Expand Down Expand Up @@ -272,6 +272,12 @@
UserIdentityToken newIdentity = null;
UserTokenPolicy userTokenPolicy = null;

// fast path no lock
if (!m_sessions.TryGetValue(authenticationToken, out _))
mregen marked this conversation as resolved.
Show resolved Hide resolved
{
throw new ServiceResultException(StatusCodes.BadSessionIdInvalid);

Check warning on line 278 in Libraries/Opc.Ua.Server/Session/SessionManager.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Server/Session/SessionManager.cs#L278

Added line #L278 was not covered by tests
}

lock (m_lock)
{
// find session.
Expand Down Expand Up @@ -306,7 +312,6 @@
out newIdentity,
out userTokenPolicy);
}

IUserIdentity identity = null;
IUserIdentity effectiveIdentity = null;
ServiceResult error = null;
Expand Down Expand Up @@ -393,22 +398,23 @@
/// </remarks>
public virtual void CloseSession(NodeId sessionId)
{
// find the session.
Session session = null;

lock (m_lock)
// thread safe search for the session.
foreach (KeyValuePair<NodeId, Session> current in m_sessions)
{
foreach (KeyValuePair<NodeId, Session> current in m_sessions)
if (current.Value.Id == sessionId)
{
if (current.Value.Id == sessionId)
if (!m_sessions.TryRemove(current.Key, out session))
{
session = current.Value;
m_sessions.Remove(current.Key);
break;
// found but was already removed
return;

Check warning on line 411 in Libraries/Opc.Ua.Server/Session/SessionManager.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Server/Session/SessionManager.cs#L411

Added line #L411 was not covered by tests
}
break;
}
}

// close the session if removed.
if (session != null)
{
// raise session related event.
Expand All @@ -423,7 +429,6 @@
m_server.ServerDiagnostics.CurrentSessionCount--;
mregen marked this conversation as resolved.
Show resolved Hide resolved
}
}

}

/// <summary>
Expand All @@ -442,44 +447,41 @@

try
{
lock (m_lock)
// check for create session request.
if (requestType == RequestType.CreateSession || requestType == RequestType.ActivateSession)
{
// check for create session request.
if (requestType == RequestType.CreateSession || requestType == RequestType.ActivateSession)
{
return new OperationContext(requestHeader, requestType);
}
return new OperationContext(requestHeader, requestType);
}

// find session.
if (!m_sessions.TryGetValue(requestHeader.AuthenticationToken, out session))
{
EventHandler<ValidateSessionLessRequestEventArgs> handler = m_validateSessionLessRequest;

// find session.
if (!m_sessions.TryGetValue(requestHeader.AuthenticationToken, out session))
if (handler != null)
{
EventHandler<ValidateSessionLessRequestEventArgs> handler = m_validateSessionLessRequest;
var args = new ValidateSessionLessRequestEventArgs(requestHeader.AuthenticationToken, requestType);
handler(this, args);

Check warning on line 464 in Libraries/Opc.Ua.Server/Session/SessionManager.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Server/Session/SessionManager.cs#L463-L464

Added lines #L463 - L464 were not covered by tests

if (handler != null)
if (ServiceResult.IsBad(args.Error))
{
var args = new ValidateSessionLessRequestEventArgs(requestHeader.AuthenticationToken, requestType);
handler(this, args);

if (ServiceResult.IsBad(args.Error))
{
throw new ServiceResultException(args.Error);
}

return new OperationContext(requestHeader, requestType, args.Identity);
throw new ServiceResultException(args.Error);

Check warning on line 468 in Libraries/Opc.Ua.Server/Session/SessionManager.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Server/Session/SessionManager.cs#L468

Added line #L468 was not covered by tests
}

throw new ServiceResultException(StatusCodes.BadSessionIdInvalid);
return new OperationContext(requestHeader, requestType, args.Identity);

Check warning on line 471 in Libraries/Opc.Ua.Server/Session/SessionManager.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Server/Session/SessionManager.cs#L471

Added line #L471 was not covered by tests
}

// validate request header.
session.ValidateRequest(requestHeader, requestType);
throw new ServiceResultException(StatusCodes.BadSessionIdInvalid);
}

// validate request header.
session.ValidateRequest(requestHeader, requestType);

// validate user has permissions for additional info
session.ValidateDiagnosticInfo(requestHeader);
// validate user has permissions for additional info
session.ValidateDiagnosticInfo(requestHeader);

// return context.
return new OperationContext(requestHeader, requestType, session);
}
// return context.
return new OperationContext(requestHeader, requestType, session);
}
catch (Exception e)
{
Expand Down Expand Up @@ -584,17 +586,10 @@

do
{
Session[] sessions = null;

lock (m_lock)
{
sessions = new Session[m_sessions.Count];
m_sessions.Values.CopyTo(sessions, 0);
}

for (int ii = 0; ii < sessions.Length; ii++)
// enumerator is thread safe
foreach (var sessionKeyValue in m_sessions)
{
var session = sessions[ii];
Session session = sessionKeyValue.Value;
if (session.HasExpired)
{
// update diagnostics.
Expand All @@ -604,9 +599,9 @@
}

// raise audit event for session closed because of timeout
m_server.ReportAuditCloseSessionEvent(null, sessions[ii], "Session/Timeout");
m_server.ReportAuditCloseSessionEvent(null, session, "Session/Timeout");

m_server.CloseSession(null, sessions[ii].Id, false);
m_server.CloseSession(null, session.Id, false);
}
// if a session had no activity for the last m_minSessionTimeout milliseconds, send a keep alive event.
else if (session.ClientLastContactTime.AddMilliseconds(m_minSessionTimeout) < DateTime.UtcNow)
Expand Down Expand Up @@ -634,7 +629,7 @@
#region Private Fields
private readonly object m_lock = new object();
private IServerInternal m_server;
private Dictionary<NodeId, Session> m_sessions;
private ConcurrentDictionary<NodeId, Session> m_sessions;
private long m_lastSessionId;
private ManualResetEvent m_shutdownEvent;

Expand Down Expand Up @@ -790,14 +785,12 @@
/// <inheritdoc/>
public Session GetSession(NodeId authenticationToken)
{

Session session = null;
lock (m_lock)
// find session.
if (m_sessions.TryGetValue(authenticationToken, out Session session))
{
// find session.
m_sessions.TryGetValue(authenticationToken, out session);
return session;
}
return session;
return null;

Check warning on line 793 in Libraries/Opc.Ua.Server/Session/SessionManager.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Server/Session/SessionManager.cs#L793

Added line #L793 was not covered by tests
}
#endregion
}
Expand Down
Loading
Loading