diff --git a/src/ered_connection.erl b/src/ered_connection.erl index ecbdaaa..2285a9f 100644 --- a/src/ered_connection.erl +++ b/src/ered_connection.erl @@ -34,14 +34,16 @@ %% If commands are queued up in the process message queue this is the max %% amount of messages that will be received and sent in one call {batch_size, non_neg_integer()} | + %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. + {connect_timeout, timeout()} | %% Options passed to gen_tcp:connect/4. {tcp_options, [gen_tcp:connect_option()]} | - %% Timeout passed to gen_tcp:connect/4. + %% Timeout passed to gen_tcp:connect/4. DEPRECATED. {tcp_connect_timeout, timeout()} | - %% Options passed to ssl:connect/3. If this config parameter is present, + %% Options passed to ssl:connect/4. If this config parameter is present, %% TLS is used. {tls_options, [ssl:tls_client_option()]} | - %% Timeout passed to ssl:connect/3. + %% Timeout passed to ssl:connect/4. DEPRECATED. {tls_connect_timeout, timeout()} | %% Callback for push notifications {push_cb, push_cb()} | diff --git a/test/ered_SUITE.erl b/test/ered_SUITE.erl index 956ded9..9a0d141 100644 --- a/test/ered_SUITE.erl +++ b/test/ered_SUITE.erl @@ -37,6 +37,8 @@ all() -> -define(DEFAULT_SERVER_DOCKER_IMAGE, "valkey/valkey:8.0.1"). +-define(CLIENT_OPTS, [{connection_opts, [{connect_timeout, 500}]}]). + init_per_suite(_Config) -> stop_containers(), % just in case there is junk from previous runs Image = os:getenv("SERVER_DOCKER_IMAGE", ?DEFAULT_SERVER_DOCKER_IMAGE), @@ -54,12 +56,7 @@ init_per_suite(_Config) -> [P, Image, EnableDebugCommand, P]) || P <- ?PORTS]), - timer:sleep(2000), - lists:foreach(fun(Port) -> - {ok,Pid} = ered_client:start_link("127.0.0.1", Port, []), - {ok, <<"PONG">>} = ered_client:command(Pid, [<<"ping">>]), - ered_client:stop(Pid) - end, ?PORTS), + ered_test_utils:wait_for_all_nodes_available(?PORTS, ?CLIENT_OPTS), create_cluster(), wait_for_consistent_cluster(), @@ -67,7 +64,7 @@ init_per_suite(_Config) -> init_per_testcase(_Testcase, Config) -> %% Quick check that cluster is OK; otherwise restart everything. - case catch ered_test_utils:check_consistent_cluster(?PORTS, []) of + case catch ered_test_utils:check_consistent_cluster(?PORTS, ?CLIENT_OPTS) of ok -> []; _ -> @@ -99,7 +96,7 @@ wait_for_consistent_cluster() -> wait_for_consistent_cluster(?PORTS). wait_for_consistent_cluster(Ports) -> - ered_test_utils:wait_for_consistent_cluster(Ports, []). + ered_test_utils:wait_for_consistent_cluster(Ports, ?CLIENT_OPTS). end_per_suite(_Config) -> stop_containers(). @@ -722,14 +719,14 @@ t_queue_full(_) -> Loop(N-1) end(21), - recv({reply, {error, queue_overflow}}, 1000), + ?MSG({reply, {error, queue_overflow}}), [ct:pal("~s\n", [os:cmd("redis-cli -p " ++ integer_to_list(Port) ++ " CLIENT UNPAUSE")]) || Port <- Ports], ?MSG(#{msg_type := queue_full}), ?MSG(#{msg_type := cluster_not_ok, reason := master_queue_full}), ?MSG(#{msg_type := queue_ok}), ?MSG(#{msg_type := cluster_ok}), - [recv({reply, {ok, <<"PONG">>}}, 1000) || _ <- lists:seq(1,20)], + [?MSG({reply, {ok, <<"PONG">>}}) || _ <- lists:seq(1,20)], no_more_msgs(), ok. @@ -978,13 +975,6 @@ start_cluster() -> start_cluster(Opts) -> ered_test_utils:start_cluster(?PORTS, Opts). -recv(Msg, Time) -> - receive - Msg -> Msg - after Time -> - error({timeout, Msg, erlang:process_info(self(), messages)}) - end. - no_more_msgs() -> {messages,Msgs} = erlang:process_info(self(), messages), case Msgs of diff --git a/test/ered_test_utils.erl b/test/ered_test_utils.erl index 69a940e..fc04fa9 100644 --- a/test/ered_test_utils.erl +++ b/test/ered_test_utils.erl @@ -4,7 +4,8 @@ -export([start_cluster/2, check_consistent_cluster/2, - wait_for_consistent_cluster/2]). + wait_for_consistent_cluster/2, + wait_for_all_nodes_available/2]). %% Start a cluster client and wait for cluster_ok. start_cluster(Ports, Opts) -> @@ -72,6 +73,28 @@ wait_for_consistent_cluster(Ports, ClientOpts) -> end end(20). +%% Wait for all nodes to be available for communication. +wait_for_all_nodes_available(Ports, ClientOpts) -> + Pids = [fun(Port) -> + {ok, Pid} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}] ++ ClientOpts), + Pid + end(P) || P <- Ports], + wait_for_connection_up(Pids), + no_more_msgs(). + +wait_for_connection_up([]) -> + ok; +wait_for_connection_up(Pids) -> + {_, {Pid, _, _}, _} = ?MSG({connection_status, _, connection_up}, 4000), + {ok, <<"PONG">>} = ered_client:command(Pid, [<<"ping">>]), + + %% Stop client and allow optional connect_error events + ered_client:stop(Pid), + ?MSG({connection_status, {Pid, _, _}, {connection_down, {client_stopped, _}}}), + ?OPTIONAL_MSG({connection_status, {Pid, _, _}, {connection_down, _}}), + ?OPTIONAL_MSG({connection_status, {Pid, _, _}, {connection_down, _}}), + wait_for_connection_up(lists:delete(Pid, Pids)). + no_more_msgs() -> {messages,Msgs} = erlang:process_info(self(), messages), case Msgs of diff --git a/test/ered_tls_SUITE.erl b/test/ered_tls_SUITE.erl index 1252714..7e2a75d 100644 --- a/test/ered_tls_SUITE.erl +++ b/test/ered_tls_SUITE.erl @@ -24,7 +24,8 @@ groups() -> {verify, verify_peer}, {server_name_indication, "Server"}]). --define(CLIENT_OPTS, [{connection_opts, [{tls_options, ?TLS_OPTS}]}]). +-define(CLIENT_OPTS, [{connection_opts, [{tls_options, ?TLS_OPTS}, + {connect_timeout, 500}]}]). init_per_suite(_Config) -> stop_containers(), % just in case there is junk from previous runs @@ -112,12 +113,7 @@ start_containers() -> [P, Path, Image, EnableDebugCommand, P]) || P <- ?PORTS]), - timer:sleep(3000), - lists:foreach(fun(Port) -> - {ok,Pid} = ered_client:start_link("127.0.0.1", Port, ?CLIENT_OPTS), - {ok, <<"PONG">>} = ered_client:command(Pid, [<<"ping">>]), - ered_client:stop(Pid) - end, ?PORTS). + ered_test_utils:wait_for_all_nodes_available(?PORTS, ?CLIENT_OPTS). stop_containers() -> cmd_log([io_lib:format("docker stop redis-tls-~p; docker rm redis-tls-~p;", [P, P])