Skip to content

Commit

Permalink
Auth Expiration Resubscribe (#853)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jan 15, 2024
1 parent a85c856 commit 3e69680
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions src/NATS.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,7 @@ private void processConnectInit(Srv s)

internal bool connect(Srv s, out Exception exToThrow)
{
bool authAuthError = false;
url = s.Url;
try
{
Expand Down Expand Up @@ -1199,6 +1200,7 @@ internal bool connect(Srv s, out Exception exToThrow)
string message = ex.Message.ToLower();
if (!NATSException.IsAuthenticationOrAuthorizationError(message, true))
{
authAuthError = true;
throw;
}

Expand All @@ -1218,7 +1220,7 @@ internal bool connect(Srv s, out Exception exToThrow)
catch (Exception ex)
{
exToThrow = ex;
close(ConnState.DISCONNECTED, false, ex);
close(ConnState.DISCONNECTED, false, ex, authAuthError);
lock (mu)
{
url = null;
Expand Down Expand Up @@ -2450,13 +2452,13 @@ public Exception LastError
// sets the connection's lastError.
internal void processErr(MemoryStream errorStream)
{
string s = getNormalizedError(errorStream);
string normalizedErrorText = getNormalizedError(errorStream);

if (IC.STALE_CONNECTION.Equals(s))
if (IC.STALE_CONNECTION.Equals(normalizedErrorText))
{
processOpError(new NATSStaleConnectionException());
}
else if (IC.AUTH_TIMEOUT.Equals(s))
else if (IC.AUTH_TIMEOUT.Equals(normalizedErrorText))
{
// Protect against a timing issue where an authoriztion error
// is handled before the connection close from the server.
Expand All @@ -2465,7 +2467,7 @@ internal void processErr(MemoryStream errorStream)
}
else
{
NATSException ex = new NATSException("Error from processErr(): " + s);
NATSException ex = new NATSException("Error from processErr(): " + normalizedErrorText);
bool invokeDelegates = false;

lock (mu)
Expand All @@ -2478,9 +2480,10 @@ internal void processErr(MemoryStream errorStream)
}
}

close(ConnState.CLOSED, invokeDelegates, ex);
bool authAuthError = NATSException.IsAuthenticationOrAuthorizationError(normalizedErrorText);
close(ConnState.CLOSED, invokeDelegates, ex, authAuthError);

if (NATSException.IsAuthenticationOrAuthorizationError(s))
if (authAuthError)
{
processReconnect();
}
Expand Down Expand Up @@ -4701,7 +4704,7 @@ private void clearPendingRequestCalls()
// desired status. Also controls whether user defined callbacks
// will be triggered. The lock should not be held entering this
// function. This function will handle the locking manually.
private void close(ConnState closeState, bool invokeDelegates, Exception error = null)
private void close(ConnState closeState, bool invokeDelegates, Exception error, bool authAuthError)
{
lock (mu)
{
Expand Down Expand Up @@ -4730,15 +4733,17 @@ private void close(ConnState closeState, bool invokeDelegates, Exception error =

stopPingTimer();

// Close sync subscriber channels and release any
// pending NextMsg() calls.
foreach (Subscription s in subs.Values)
if (!authAuthError)
{
s.close();
// Close sync subscriber channels and release any
// pending NextMsg() calls.
foreach (Subscription s in subs.Values)
{
s.close();
}
subs.Clear();
}

subs.Clear();

// perform appropriate callback is needed for a
// disconnect;
if (invokeDelegates && conn.isSetup())
Expand Down Expand Up @@ -4808,7 +4813,7 @@ private void close(ConnState closeState, bool invokeDelegates, Exception error =
/// <seealso cref="State"/>
public void Close()
{
close(ConnState.CLOSED, true, lastEx);
close(ConnState.CLOSED, true, lastEx, false);
callbackScheduler.ScheduleStop();
disableSubChannelPooling();
}
Expand Down

0 comments on commit 3e69680

Please sign in to comment.