Skip to content

Commit

Permalink
v1.2.11 - add sentinel +switch-master;
Browse files Browse the repository at this point in the history
  • Loading branch information
2881099 committed Dec 18, 2023
1 parent 7af248b commit f043af5
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 31 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ string[] vals = cli.MGet("key1", "key2");
| name | \<empty\> | Connection name, use client list command to view |
| prefix | \<empty\> | The prefix of the key, all methods will have this prefix. cli.Set(prefix + "key", 111); |
| exitAutoDisposePool | true | AppDomain.CurrentDomain.ProcessExit/Console.CancelKeyPress auto disposed |
| subscribleReadbytes | false | Subscrible read bytes |
| subscribeReadbytes | false | Subscribe read bytes |

> IPv6: [fe80::b164:55b3:4b4f:7ce6%15]:6379
Expand Down
2 changes: 1 addition & 1 deletion README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ string[] vals = cli.MGet("key1", "key2");
| name | \<empty\> | 连接名,使用 CLIENT LIST 命令查看 |
| prefix | \<empty\> | `key` 前辍,所有方法都会附带此前辍,cli.Set(prefix + "key", 111); |
| exitAutoDisposePool | true | AppDomain.CurrentDomain.ProcessExit/Console.CancelKeyPress 事件自动释放 |
| subscribleReadbytes | false | Subscrible 读取内容为 byte[] |
| subscribeReadbytes | false | Subscribe 读取内容为 byte[] |

> IPv6: [fe80::b164:55b3:4b4f:7ce6%15]:6379
Expand Down
2 changes: 1 addition & 1 deletion src/FreeRedis/CommandPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class CommandPacket
internal bool? _IsIgnoreAop;
public bool IsIgnoreAop
{
get => _IsIgnoreAop ?? (_IsIgnoreAop = _command == "PING" && _input.Count == 2 && _input[1].ToString() == "CheckAvailable").Value;
get => _IsIgnoreAop ?? false;
set => _IsIgnoreAop = value;
}

Expand Down
7 changes: 4 additions & 3 deletions src/FreeRedis/ConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public string Host
public int MinPoolSize { get; set; } = 1;
public int Retry { get; set; } = 0;
public bool ExitAutoDisposePool { get; set; } = true;
public bool SubscribleReadbytes { get; set; } = false;
public bool SubscribeReadbytes { get; set; } = false;

public RemoteCertificateValidationCallback CertificateValidation;
public LocalCertificateSelectionCallback CertificateSelection;
Expand Down Expand Up @@ -63,7 +63,7 @@ public override string ToString()
if (MinPoolSize != 1) sb.Append(",min pool size=").Append(MinPoolSize);
if (Retry != 0) sb.Append(",retry=").Append(Retry);
if (ExitAutoDisposePool != true) sb.Append(",exitAutoDisposePool=false");
if (SubscribleReadbytes != false) sb.Append(",subscribleReadbytes=true");
if (SubscribeReadbytes != false) sb.Append(",subscribeReadbytes=true");
return sb.ToString();
}

Expand Down Expand Up @@ -104,7 +104,8 @@ public static ConnectionStringBuilder Parse(string connectionString)
case "minpoolsize": if (kv.Length > 1 && int.TryParse(kv[1].Trim(), out var minPoolSize) && minPoolSize >= 0) ret.MinPoolSize = minPoolSize; break;
case "retry": if (kv.Length > 1 && int.TryParse(kv[1].Trim(), out var retry) && retry > 0) ret.Retry = retry; break;
case "exitautodisposepool": if (kv.Length > 1 && new[] { "false", "0" }.Contains(kv[1].Trim())) ret.ExitAutoDisposePool = false; break;
case "subscriblereadbytes": if (kv.Length > 1 && kv[1].ToLower().Trim() == "true") ret.SubscribleReadbytes = true; break;
case "subscriblereadbytes": //history error
case "subscribereadbytes": if (kv.Length > 1 && kv[1].ToLower().Trim() == "true") ret.SubscribeReadbytes = true; break;
}
}
return ret;
Expand Down
2 changes: 1 addition & 1 deletion src/FreeRedis/FreeRedis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<AssemblyName>FreeRedis</AssemblyName>
<PackageId>FreeRedis</PackageId>
<RootNamespace>FreeRedis</RootNamespace>
<Version>1.2.10</Version>
<Version>1.2.11</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageProjectUrl>https://github.com/2881099/FreeRedis</PackageProjectUrl>
<Description>FreeRedis is .NET redis client, supports cluster, sentinel, master-slave, pipeline, transaction and connection pool.</Description>
Expand Down
38 changes: 29 additions & 9 deletions src/FreeRedis/Internal/RedisClientPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ public class RedisClientPoolPolicy : IPolicy<RedisClient>
public bool OnCheckAvailable(Object<RedisClient> obj)
{
obj.ResetValue();
return obj.Value.Ping("CheckAvailable") == "CheckAvailable";
CommandPacket cmd = "PING";
cmd.IsIgnoreAop = true;
return obj.Value.Call(cmd) as string == "PONG";
}

public RedisClient OnCreate()
Expand Down Expand Up @@ -192,8 +194,10 @@ public void OnGet(Object<RedisClient> obj)
{
try
{
obj.Value.Ping("CheckAvailable");
}
CommandPacket cmd = "PING";
cmd.IsIgnoreAop = true;
obj.Value.Call(cmd);
}
catch
{
obj.ResetValue();
Expand All @@ -202,11 +206,25 @@ public void OnGet(Object<RedisClient> obj)
}
}
#if !NET40
public Task OnGetAsync(Object<RedisClient> obj)
async public Task OnGetAsync(Object<RedisClient> obj)
{
OnGet(obj); //todo
return Task.FromResult(false);
}
if (_pool.IsAvailable)
{
if (DateTime.Now.Subtract(obj.LastReturnTime).TotalSeconds > 60 || obj.Value.Adapter.GetRedisSocket(null).IsConnected == false)
{
try
{
CommandPacket cmd = "PING";
cmd.IsIgnoreAop = true;
await obj.Value.CallAsync(cmd);
}
catch
{
obj.ResetValue();
}
}
}
}
#endif

public void OnGetTimeout() { }
Expand All @@ -229,8 +247,10 @@ public static void PrevReheatConnectionPool(ObjectPool<RedisClient> pool, int mi
{
var conn = pool.Get();
initConns.Add(conn);
conn.Value.Ping("CheckAvailable");
}
CommandPacket cmd = "PING";
cmd.IsIgnoreAop = true;
conn.Value.Call(cmd);
}
catch (Exception ex)
{
initTestOk = false; //预热一次失败,后面将不进行
Expand Down
37 changes: 34 additions & 3 deletions src/FreeRedis/RedisClient/Adapter/SentinelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using FreeRedis.Internal.ObjectPool;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Net;
using System.Text;
Expand All @@ -17,6 +18,7 @@ internal class SentinelAdapter : BaseAdapter
internal readonly IdleBus<RedisClientPool> _ib;
internal readonly ConnectionStringBuilder _connectionString;
readonly LinkedList<ConnectionStringBuilder> _sentinels;
readonly RedisClient _sentinelSubscribe;
string _masterHost;
readonly bool _rw_splitting;
readonly bool _is_single;
Expand All @@ -33,6 +35,8 @@ public SentinelAdapter(RedisClient topOwner, ConnectionStringBuilder sentinelCon
csb.Host = csb.Host.ToLower();
csb.CertificateValidation = _connectionString.CertificateValidation;
csb.CertificateSelection = _connectionString.CertificateSelection;
csb.MaxPoolSize = 1;
csb.MinPoolSize = 1;
return csb;
}).GroupBy(a => a.Host, a => a).Select(a => a.First()) ?? new ConnectionStringBuilder[0]);
_rw_splitting = rw_splitting;
Expand All @@ -42,12 +46,38 @@ public SentinelAdapter(RedisClient topOwner, ConnectionStringBuilder sentinelCon

_ib = new IdleBus<RedisClientPool>(TimeSpan.FromMinutes(10));
ResetSentinel();
}
_sentinelSubscribe = new RedisClient(_sentinels.First.Value);
_sentinelSubscribe.Subscribe("+switch-master", (chan, msg) =>
{
if (chan == "+switch-master" && msg != null)
{
var args = msg?.ToString().Split(' '); //mymaster 127.0.0.1 6381 127.0.0.1 6379
if (args == null || args.Length < 5) return;
if (_connectionString.Host != args[0]) return;

var masterhostEnd = $"{args[3]}:{args[4]}";
_ib.TryRemove(_masterHost);

ConnectionStringBuilder connectionString = _connectionString.ToString();
connectionString.Host = masterhostEnd;
connectionString.MinPoolSize = _connectionString.MinPoolSize;
connectionString.MaxPoolSize = _connectionString.MaxPoolSize;
connectionString.CertificateValidation = _connectionString.CertificateValidation;
connectionString.CertificateSelection = _connectionString.CertificateSelection;
_ib.TryRegister(masterhostEnd, () => new RedisClientPool(connectionString, TopOwner));

Interlocked.Exchange(ref _masterHost, masterhostEnd);
if (!TopOwner.OnNotice(null, new NoticeEventArgs(NoticeType.Info, null, $"{_connectionString.Host.PadRight(21)} > Redis Sentinel switch to {_masterHost}", null)))
TestTrace.WriteLine($"【{_connectionString.Host}】Redis Sentinel switch to {_masterHost}", ConsoleColor.DarkGreen);
}
});
}

bool isdisposed = false;
public override void Dispose()
{
foreach (var key in _ib.GetKeys())
_sentinelSubscribe.Dispose();
foreach (var key in _ib.GetKeys())
{
var pool = _ib.Get(key);
TopOwner.Unavailable?.Invoke(TopOwner, new UnavailableEventArgs(pool.Key, pool));
Expand Down Expand Up @@ -296,9 +326,10 @@ bool RecoverySentinel()
Thread.CurrentThread.Join(1000);
try
{
var oldMasterHost = _masterHost;
ResetSentinel();

if (_ib.Get(_masterHost).CheckAvailable())
if (oldMasterHost != _masterHost && _ib.Get(_masterHost).CheckAvailable())
{
if (!TopOwner.OnNotice(null, new NoticeEventArgs(NoticeType.Info, null, $"{_connectionString.Host.PadRight(21)} > Redis Sentinel switch to {_masterHost}", null)))
TestTrace.WriteLine($"【{_connectionString.Host}】Redis Sentinel switch to {_masterHost}", ConsoleColor.DarkGreen);
Expand Down
2 changes: 1 addition & 1 deletion src/FreeRedis/RedisClient/PubSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ internal IDisposable Subscribe(bool psub, bool ssub, string[] channels, Action<s
_topOwner.Adapter.Refersh(_redisSocket); //防止 IdleBus 超时回收
try { _redisSocket.Write("PING"); } catch { }
}, null, 10000, 10000);
var readCmd = "PubSubRead".SubCommand(null).FlagReadbytes(_topOwner.ConnectionString.SubscribleReadbytes);
var readCmd = "PubSubRead".SubCommand(null).FlagReadbytes(_topOwner.ConnectionString.SubscribeReadbytes);
while (_stoped == false)
{
RedisResult rt = null;
Expand Down
13 changes: 2 additions & 11 deletions src/FreeRedis/RedisSentinelClient.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
using FreeRedis.Internal;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;

namespace FreeRedis
{
public partial class RedisSentinelClient : IDisposable
public partial class RedisSentinelClient : IDisposable
{
readonly IRedisSocket _redisSocket;

Expand Down Expand Up @@ -65,26 +62,20 @@ public SentinelIsMaterDownByAddrResult IsMasterDownByAddr(string ip, int port, l
public long Reset(string pattern) => Call("SENTINEL".SubCommand("RESET").InputRaw(pattern), rt => rt.ThrowOrValue<long>());
public void Failover(string masterName) => Call("SENTINEL".SubCommand("FAILOVER").InputRaw(masterName), rt => rt.ThrowOrNothing());



public object PendingScripts() => Call("SENTINEL".SubCommand("PENDING-SCRIPTS"), rt => rt.ThrowOrValue());
public object Monitor(string name, string ip, int port, int quorum) => Call("SENTINEL".SubCommand("MONITOR").Input(name, ip, port, quorum), rt => rt.ThrowOrValue());



public void FlushConfig() => Call("SENTINEL".SubCommand("FLUSHCONFIG"), rt => rt.ThrowOrNothing());
public void Remove(string masterName) => Call("SENTINEL".SubCommand("REMOVE").InputRaw(masterName), rt => rt.ThrowOrNothing());
public string CkQuorum(string masterName) => Call("SENTINEL".SubCommand("CKQUORUM").InputRaw(masterName), rt => rt.ThrowOrValue<string>());
public void Set(string masterName, string option, string value) => Call("SENTINEL".SubCommand("SET").Input(masterName, option, value), rt => rt.ThrowOrNothing());



public object InfoCache(string masterName) => Call<object>("SENTINEL".SubCommand("INFO-CACHE").InputRaw(masterName), rt => rt.ThrowOrValue());
public void SimulateFailure(bool crashAfterElection, bool crashAfterPromotion) => Call<object>("SENTINEL"
.SubCommand("SIMULATE-FAILURE")
.InputIf(crashAfterElection, "crash-after-election")
.InputIf(crashAfterPromotion, "crash-after-promotion"), rt => rt.ThrowOrNothing());
}
}

#region Model
public class SentinelRoleResult
Expand Down

0 comments on commit f043af5

Please sign in to comment.