From fc863f5c729e29ca4c75df8607f3df08fedc11ce Mon Sep 17 00:00:00 2001 From: Foo Bar Date: Fri, 20 Sep 2024 10:31:58 +0800 Subject: [PATCH] feat: support xrpc protocol custom protocol --- lib/resty/healthcheck.lua | 63 +++++ t/report_xrpc_status.t | 502 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 565 insertions(+) create mode 100644 t/report_xrpc_status.t diff --git a/lib/resty/healthcheck.lua b/lib/resty/healthcheck.lua index 03ad36b..77390a6 100644 --- a/lib/resty/healthcheck.lua +++ b/lib/resty/healthcheck.lua @@ -54,6 +54,7 @@ local EMPTY = setmetatable({},{ -- Counters: a 32-bit shm integer can hold up to four 8-bit counters. local CTR_SUCCESS = 0x00000001 local CTR_HTTP = 0x00000100 +local CTR_XRPC = 0x00001000 local CTR_TCP = 0x00010000 local CTR_TIMEOUT = 0x01000000 @@ -63,6 +64,7 @@ local MASK_SUCCESS = 0x000000ff local COUNTER_NAMES = { [CTR_SUCCESS] = "SUCCESS", [CTR_HTTP] = "HTTP", + [CTR_XRPC] = "XRPC", [CTR_TCP] = "TCP", [CTR_TIMEOUT] = "TIMEOUT", } @@ -659,6 +661,9 @@ function checker:report_failure(ip, port, hostname, check) if self.checks[check or "passive"].type == "tcp" then limit = checks.unhealthy.tcp_failures ctr_type = CTR_TCP + elseif self.checks[check or "passive"].type == "xrpc" then + limit = checks.unhealthy.failures + ctr_type = CTR_XRPC else limit = checks.unhealthy.http_failures ctr_type = CTR_HTTP @@ -725,6 +730,34 @@ function checker:report_http_status(ip, port, hostname, http_status, check) end + +function checker:report_xrpc_status(ip, port, hostname, status, check) + status = tonumber(status) or 0 + + if check ~= "active" then + return + end + + local checks = self.checks[check or "active"] + + local status_type, limit, ctr + if checks.healthy.statuses[status] then + status_type = "healthy" + limit = checks.healthy.successes + ctr = CTR_SUCCESS + elseif checks.unhealthy.statuses[status] + or status == 0 then + status_type = "unhealthy" + limit = checks.unhealthy.failures + ctr = CTR_XRPC + else + return + end + + return incr_counter(self, status_type, ip, port, hostname, limit, ctr) + +end + --- Report a failure on TCP level. -- If `unhealthy.tcp_failures` is set to zero in the configuration, -- this function is a no-op and returns `true`. @@ -857,6 +890,23 @@ end -- Runs a single healthcheck probe function checker:run_single_check(ip, port, hostname, hostheader) + if self.checks.active.type == "xrpc" then + local ok, status = self.checks.active.handler({ + host = ip, + port = port, + domain = hostname, + }, self.xrpc_conf) + + if not ok then + return self:report_failure(ip, port, hostname, "active") + end + + if status then + return self:report_xrpc_status(ip, port, hostname, status, "active") + end + + return self:report_success(ip, port, hostname, "active") + end local sock, err = ngx.socket.tcp() if not sock then @@ -1277,15 +1327,20 @@ local defaults = { concurrency = 10, http_path = "/", https_verify_certificate = true, + handler = NO_DEFAULT, + xrpc_conf = {}, healthy = { interval = 0, -- 0 = disabled by default http_statuses = { 200, 302 }, + statuses = { 200, 302 }, successes = 2, }, unhealthy = { interval = 0, -- 0 = disabled by default http_statuses = { 429, 404, 500, 501, 502, 503, 504, 505 }, + statuses = { 500, 501 }, + failures = 2, tcp_failures = 2, timeouts = 3, http_failures = 5, @@ -1325,6 +1380,7 @@ do http = true, tcp = true, https = true, + xrpc = true, } check_valid_type = function(var, val) assert(valid_types[val], @@ -1399,6 +1455,11 @@ function _M.new(opts) self.test_get_counter = test_get_counter end + if self.checks.active.type == "xrpc" then + assert(self.checks.active.handler, "required option 'checks.active.handler' is missing") + assert(self.checks.active.unhealthy.failures < 255, "checks.active.unhealthy.tcp_failures must be at most 254") + end + assert(self.name, "required option 'name' is missing") assert(self.shm_name, "required option 'shm_name' is missing") @@ -1420,6 +1481,8 @@ function _M.new(opts) to_set(self.checks.active.healthy, "http_statuses") to_set(self.checks.passive.unhealthy, "http_statuses") to_set(self.checks.passive.healthy, "http_statuses") + to_set(self.checks.active.healthy, "statuses") + to_set(self.checks.active.unhealthy, "statuses") -- decorate with methods and constants self.events = EVENTS diff --git a/t/report_xrpc_status.t b/t/report_xrpc_status.t new file mode 100644 index 0000000..fae56c7 --- /dev/null +++ b/t/report_xrpc_status.t @@ -0,0 +1,502 @@ +use Test::Nginx::Socket::Lua 'no_plan'; +use Cwd qw(cwd); + +workers(1); + +my $pwd = cwd(); + +our $HttpConfig = qq{ + lua_package_path "$pwd/lib/?.lua;;"; + lua_shared_dict test_shm 8m; + lua_shared_dict my_worker_events 8m; +}; + +run_tests(); + +__DATA__ + + + +=== TEST 1: report_xrpc_status() failures active +--- http_config eval +qq{ + $::HttpConfig + + server { + listen 2119; + location = /status { + return 200; + } + } +} +--- config + location = /t { + content_by_lua_block { + local we = require "resty.worker.events" + assert(we.configure{ shm = "my_worker_events", interval = 0.1 }) + local healthcheck = require("resty.healthcheck") + local checker = healthcheck.new({ + name = "testing", + shm_name = "test_shm", + type = "xrpc", + checks = { + active = { + healthy = { + interval = 999, -- we don't want active checks + statuses = { 200 }, + successes = 3, + }, + unhealthy = { + interval = 999, -- we don't want active checks + statuses = { 500 }, + failures = 2, + }, + handler = function(node, conf)end + }, + }, + }) + ngx.sleep(0.1) -- wait for initial timers to run once + local ok, err = checker:add_target("127.0.0.1", 2119, nil, true) + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119)) -- false + } + } +--- request +GET /t +--- response_body +false +--- error_log +checking healthy targets: nothing to do +checking unhealthy targets: nothing to do +unhealthy XRPC increment (1/2) for '(127.0.0.1:2119)' +unhealthy XRPC increment (2/2) for '(127.0.0.1:2119)' +event: target status '(127.0.0.1:2119)' from 'true' to 'false' + + + + +=== TEST 2: report_xrpc_status() successes active +--- http_config eval +qq{ + $::HttpConfig + + server { + listen 2119; + location = /status { + return 200; + } + } +} +--- config + location = /t { + content_by_lua_block { + local we = require "resty.worker.events" + assert(we.configure{ shm = "my_worker_events", interval = 0.1 }) + local healthcheck = require("resty.healthcheck") + local checker = healthcheck.new({ + name = "testing", + shm_name = "test_shm", + type = "xrpc", + checks = { + active = { + healthy = { + interval = 999, -- we don't want active checks + successes = 4, + }, + unhealthy = { + interval = 999, -- we don't want active checks + tcp_failures = 2, + failures = 3, + + }, + handler = function(node, conf)end + }, + }, + }) + ngx.sleep(0.1) -- wait for initial timers to run once + local ok, err = checker:add_target("127.0.0.1", 2119, nil, false) + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119)) -- true + } + } +--- request +GET /t +--- response_body +true +--- error_log +checking healthy targets: nothing to do +checking unhealthy targets: nothing to do +healthy SUCCESS increment (1/4) for '(127.0.0.1:2119)' +healthy SUCCESS increment (2/4) for '(127.0.0.1:2119)' +healthy SUCCESS increment (3/4) for '(127.0.0.1:2119)' +healthy SUCCESS increment (4/4) for '(127.0.0.1:2119)' +event: target status '(127.0.0.1:2119)' from 'false' to 'true' + + + +=== TEST 3: report_xrpc_status() with success is a nop when active.healthy.successes == 0 +--- http_config eval +qq{ + $::HttpConfig + + server { + listen 2119; + location = /status { + return 200; + } + } +} +--- config + location = /t { + content_by_lua_block { + local we = require "resty.worker.events" + assert(we.configure{ shm = "my_worker_events", interval = 0.1 }) + local healthcheck = require("resty.healthcheck") + local checker = healthcheck.new({ + name = "testing", + shm_name = "test_shm", + type = "xrpc", + checks = { + active = { + healthy = { + interval = 999, -- we don't want active checks + successes = 0, + }, + unhealthy = { + interval = 999, -- we don't want active checks + tcp_failures = 2, + failures = 3, + }, + handler = function(node, conf)end + } + }, + + }) + ngx.sleep(0.1) -- wait for initial timers to run once + local ok, err = checker:add_target("127.0.0.1", 2119, nil, false) + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- false + } + } +--- request +GET /t +--- response_body +false +--- error_log +checking healthy targets: nothing to do +checking unhealthy targets: nothing to do +--- no_error_log +healthy SUCCESS increment +event: target status '127.0.0.1 (127.0.0.1:2119)' from 'false' to 'true' + + + +=== TEST 4: report_xrpc_status() with success is a nop when active.unhealthy.failures == 0 +--- http_config eval +qq{ + $::HttpConfig + + server { + listen 2119; + location = /status { + return 200; + } + } +} +--- config + location = /t { + content_by_lua_block { + local we = require "resty.worker.events" + assert(we.configure{ shm = "my_worker_events", interval = 0.1 }) + local healthcheck = require("resty.healthcheck") + local checker = healthcheck.new({ + name = "testing", + shm_name = "test_shm", + type = "xrpc", + checks = { + active = { + healthy = { + interval = 999, -- we don't want active checks + successes = 4, + }, + unhealthy = { + interval = 999, -- we don't want active checks + tcp_failures = 2, + failures = 0, + }, + handler = function(node, conf)end + }, + }, + }) + ngx.sleep(0.1) -- wait for initial timers to run once + local ok, err = checker:add_target("127.0.0.1", 2119, nil, true) + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- true + } + } +--- request +GET /t +--- response_body +true +--- error_log +checking healthy targets: nothing to do +checking unhealthy targets: nothing to do +--- no_error_log +unhealthy XRPC increment +event: target status '(127.0.0.1:2119)' from 'true' to 'false' + + + +=== TEST 5: report_xrpc_status() with success is a nop when active.unhealthy.failures == 3 and active.health.successes == 2 +--- http_config eval +qq{ + $::HttpConfig + + server { + listen 2119; + location = /status { + return 200; + } + } +} +--- config + location = /t { + content_by_lua_block { + local we = require "resty.worker.events" + assert(we.configure{ shm = "my_worker_events", interval = 0.1 }) + local healthcheck = require("resty.healthcheck") + local checker = healthcheck.new({ + name = "testing", + shm_name = "test_shm", + type = "xrpc", + checks = { + active = { + healthy = { + interval = 999, -- we don't want active checks + successes = 2, + }, + unhealthy = { + interval = 999, -- we don't want active checks + failures = 3, + }, + handler = function(node, conf)end + }, + }, + + }) + ngx.sleep(0.1) -- wait for initial timers to run once + local ok, err = checker:add_target("127.0.0.1", 2119, nil, true) + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- true + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- true + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- false + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- true + } + } +--- request +GET /t +--- response_body +true +true +false +true +--- error_log +checking healthy targets: nothing to do +checking unhealthy targets: nothing to do +unhealthy XRPC increment (1/3) for '(127.0.0.1:2119)' +unhealthy XRPC increment (2/3) for '(127.0.0.1:2119)' +unhealthy XRPC increment (3/3) for '(127.0.0.1:2119)' +event: target status '(127.0.0.1:2119)' from 'true' to 'false' +healthy SUCCESS increment (1/2) +healthy SUCCESS increment (2/2) +event: target status '(127.0.0.1:2119)' from 'false' to 'true' + + + +=== TEST 6: report_xrpc_status() special case when active.unhealthy.failures == 3 and active.health.successes == 2 and health.statuses [200] and unhealthy.statuses [501] +--- http_config eval +qq{ + $::HttpConfig + + server { + listen 2119; + location = /status { + return 200; + } + } +} +--- config + location = /t { + content_by_lua_block { + local we = require "resty.worker.events" + assert(we.configure{ shm = "my_worker_events", interval = 0.1 }) + local healthcheck = require("resty.healthcheck") + local checker = healthcheck.new({ + name = "testing", + shm_name = "test_shm", + type = "xrpc", + checks = { + active = { + healthy = { + interval = 999, -- we don't want active checks + statuses = { 200 }, + successes = 1, + }, + unhealthy = { + interval = 999, -- we don't want active checks + statuses = { 501 }, + failures = 2, + }, + handler = function(node, conf)end + }, + }, + + }) + ngx.sleep(0.1) -- wait for initial timers to run once + local ok, err = checker:add_target("127.0.0.1", 2119, nil, true) + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- true + + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 500, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- true + + checker:report_xrpc_status("127.0.0.1", 2119, nil, 501, "active") + checker:report_xrpc_status("127.0.0.1", 2119, nil, 501, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- false + + checker:report_xrpc_status("127.0.0.1", 2119, nil, 201, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- false + + checker:report_xrpc_status("127.0.0.1", 2119, nil, 200, "active") + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- true + } + } +--- request +GET /t +--- response_body +true +true +false +false +true +--- error_log +checking healthy targets: nothing to do +checking unhealthy targets: nothing to do +unhealthy XRPC increment (1/2) for '(127.0.0.1:2119)' +unhealthy XRPC increment (2/2) for '(127.0.0.1:2119)' +event: target status '(127.0.0.1:2119)' from 'true' to 'false' +healthy SUCCESS increment (1/1) +event: target status '(127.0.0.1:2119)' from 'false' to 'true' + + + +=== TEST 7: start xrpc healthcheck with active.unhealthy.failures == 3 and active.health.successes == 2 +--- http_config eval +qq{ + $::HttpConfig + + lua_shared_dict request_counters 1m; + + server { + listen 2119; + + location /status { + content_by_lua_block { + local uri = ngx.var.request_uri + local counter = ngx.shared.request_counters + + counter:incr(uri, 1, 0) + + local current_count = counter:get(uri) or 0 + + if current_count < 4 or current_count > 8 then + ngx.status = 200 + ngx.say("OK") + + else + ngx.status = 500 + ngx.say("ERROR") + end + } + } + } +} +--- config + location = /t { + content_by_lua_block { + local we = require "resty.worker.events" + assert(we.configure{ shm = "my_worker_events", interval = 0.1 }) + local healthcheck = require("resty.healthcheck") + local checker = healthcheck.new({ + name = "testing", + shm_name = "test_shm", + type = "xrpc", + checks = { + active = { + healthy = { + interval = 0.5, -- we don't want active checks + successes = 2, + }, + unhealthy = { + interval = 0.5, -- we don't want active checks + failures = 3, + }, + handler = function(node, conf) + local http = require('resty.http') + local httpc = http.new() + local res, err = httpc:request_uri("http://127.0.0.1:2119/status", { + method = "GET", + path = "/status", + }) + if not res then + return false + end + return true, res.status + end + }, + }, + }) + ngx.sleep(0.1) -- wait for initial timers to run once + checker:add_target("127.0.0.1", 2119, nil, true) + ngx.sleep(0.5) + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- true + ngx.sleep(3.5) + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- false + ngx.sleep(3.5) + ngx.say(checker:get_target_status("127.0.0.1", 2119, nil)) -- true + } + } +--- request +GET /t +--- timeout: 10 +--- response_body +true +false +true +--- error_log +checking healthy targets: nothing to do +checking unhealthy targets: nothing to do +unhealthy XRPC increment (1/3) for '(127.0.0.1:2119)' +unhealthy XRPC increment (2/3) for '(127.0.0.1:2119)' +unhealthy XRPC increment (3/3) for '(127.0.0.1:2119)' +event: target status '(127.0.0.1:2119)' from 'true' to 'false' +healthy SUCCESS increment (1/2) +healthy SUCCESS increment (2/2) +event: target status '(127.0.0.1:2119)' from 'false' to 'true'