From 3c5828fa8f3c50f5e3054809ed43ce7b10de6a06 Mon Sep 17 00:00:00 2001 From: Katherine Yang Date: Thu, 9 Nov 2023 15:55:00 -0800 Subject: [PATCH] changed location of delay --- qa/L0_client_timeout/client_timeout_test.py | 4 +- qa/L0_client_timeout/test.sh | 8 +-- src/grpc/grpc_server.cc | 77 ++++++++++++++------- src/grpc/grpc_utils.cc | 12 ---- 4 files changed, 58 insertions(+), 43 deletions(-) diff --git a/qa/L0_client_timeout/client_timeout_test.py b/qa/L0_client_timeout/client_timeout_test.py index 8eaba629c59..cdc5378c70c 100755 --- a/qa/L0_client_timeout/client_timeout_test.py +++ b/qa/L0_client_timeout/client_timeout_test.py @@ -511,7 +511,9 @@ def test_http_async_infer(self): # response. Expect an exception for small timeout values. with self.assertRaises(socket.timeout) as cm: triton_client = httpclient.InferenceServerClient( - url="localhost:8000", verbose=True, network_timeout=2.0 + url="localhost:8000", + verbose=True, + network_timeout=self.INFER_SMALL_INTERVAL, ) async_request = triton_client.async_infer( model_name=self.model_name_, inputs=self.inputs_, outputs=self.outputs_ diff --git a/qa/L0_client_timeout/test.sh b/qa/L0_client_timeout/test.sh index 518c384235b..eb3a635fb37 100755 --- a/qa/L0_client_timeout/test.sh +++ b/qa/L0_client_timeout/test.sh @@ -40,7 +40,7 @@ fi export CUDA_VISIBLE_DEVICES=0 TIMEOUT_VALUE=100000000 -SHORT_TIMEOUT_VALUE=1000 +SHORT_TIMEOUT_VALUE=1 RET=0 CLIENT_TIMEOUT_TEST=client_timeout_test.py @@ -60,7 +60,7 @@ source ../common/util.sh mkdir -p $DATADIR/custom_identity_int32/1 # Test all APIs apart from Infer. -export TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC=1 +export TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC=2 run_server if [ "$SERVER_PID" == "0" ]; then echo -e "\n***\n*** Failed to start $SERVER\n***" @@ -92,7 +92,7 @@ kill $SERVER_PID wait $SERVER_PID # Test infer APIs -export TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC= +unset TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC SERVER_ARGS="--model-repository=$DATADIR" sed -i 's#value: { string_value: "1" }#value: { string_value: "0" }#' $DATADIR/custom_identity_int32/config.pbtxt run_server @@ -240,7 +240,7 @@ kill $SERVER_PID wait $SERVER_PID # Test all APIs other than infer -export TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC=1 +export TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC=2 SERVER_ARGS="${SERVER_ARGS} --model-control-mode=explicit --load-model=custom_identity_int32 --log-verbose 2" sed -i 's#value: { string_value: "0" }#value: { string_value: "1" }#' $DATADIR/custom_identity_int32/config.pbtxt run_server diff --git a/src/grpc/grpc_server.cc b/src/grpc/grpc_server.cc index f9dbd5c0168..54251779076 100644 --- a/src/grpc/grpc_server.cc +++ b/src/grpc/grpc_server.cc @@ -92,10 +92,11 @@ class CommonCallData : public ICallData { const StandardRegisterFunc OnRegister, const StandardCallbackFunc OnExecute, const bool async, ::grpc::ServerCompletionQueue* cq, - const std::pair& restricted_kv) + const std::pair& restricted_kv, + const uint64_t& response_delay = 0) : name_(name), id_(id), OnRegister_(OnRegister), OnExecute_(OnExecute), async_(async), cq_(cq), responder_(&ctx_), step_(Steps::START), - restricted_kv_(restricted_kv) + restricted_kv_(restricted_kv), response_delay_(response_delay) { OnRegister_(&ctx_, &request_, &responder_, this); LOG_VERBOSE(1) << "Ready for RPC '" << name_ << "', " << id_; @@ -140,6 +141,8 @@ class CommonCallData : public ICallData { Steps step_; std::pair restricted_kv_{"", ""}; + + const uint64_t response_delay_; }; template @@ -165,7 +168,8 @@ CommonCallData::Process(bool rpc_ok) // Start a new request to replace this one... if (!shutdown) { new CommonCallData( - name_, id_ + 1, OnRegister_, OnExecute_, async_, cq_, restricted_kv_); + name_, id_ + 1, OnRegister_, OnExecute_, async_, cq_, restricted_kv_, + response_delay_); } if (!async_) { @@ -234,6 +238,14 @@ template void CommonCallData::WriteResponse() { + if (response_delay_ != 0) { + // Will delay the write of the response by the specified time. + // This can be used to test the flow where there are other + // responses available to be written. + LOG_VERBOSE(1) << "Delaying the write of the response by " + << response_delay_ << " seconds"; + std::this_thread::sleep_for(std::chrono::seconds(response_delay_)); + } step_ = Steps::COMPLETE; responder_.Finish(response_, status_, this); } @@ -253,7 +265,7 @@ class CommonHandler : public HandlerBase { inference::GRPCInferenceService::AsyncService* service, ::grpc::health::v1::Health::AsyncService* health_service, ::grpc::ServerCompletionQueue* cq, - const RestrictedFeatures& restricted_keys); + const RestrictedFeatures& restricted_keys, const uint64_t response_delay); // Descriptive name of of the handler. const std::string& Name() const { return name_; } @@ -299,6 +311,7 @@ class CommonHandler : public HandlerBase { ::grpc::ServerCompletionQueue* cq_; std::unique_ptr thread_; const RestrictedFeatures& restricted_keys_; + const uint64_t response_delay_ = 0; }; CommonHandler::CommonHandler( @@ -309,11 +322,12 @@ CommonHandler::CommonHandler( inference::GRPCInferenceService::AsyncService* service, ::grpc::health::v1::Health::AsyncService* health_service, ::grpc::ServerCompletionQueue* cq, - const RestrictedFeatures& restricted_keys) + const RestrictedFeatures& restricted_keys, + const uint64_t response_delay = 0) : name_(name), tritonserver_(tritonserver), shm_manager_(shm_manager), trace_manager_(trace_manager), service_(service), health_service_(health_service), cq_(cq), - restricted_keys_(restricted_keys) + restricted_keys_(restricted_keys), response_delay_(response_delay) { } @@ -440,7 +454,7 @@ CommonHandler::RegisterServerLive() ::grpc::ServerAsyncResponseWriter, inference::ServerLiveRequest, inference::ServerLiveResponse>( "ServerLive", 0, OnRegisterServerLive, OnExecuteServerLive, - false /* async */, cq_, restricted_kv); + false /* async */, cq_, restricted_kv, response_delay_); } void @@ -476,7 +490,7 @@ CommonHandler::RegisterServerReady() ::grpc::ServerAsyncResponseWriter, inference::ServerReadyRequest, inference::ServerReadyResponse>( "ServerReady", 0, OnRegisterServerReady, OnExecuteServerReady, - false /* async */, cq_, restricted_kv); + false /* async */, cq_, restricted_kv, response_delay_); } void @@ -525,7 +539,7 @@ CommonHandler::RegisterHealthCheck() ::grpc::health::v1::HealthCheckRequest, ::grpc::health::v1::HealthCheckResponse>( "Check", 0, OnRegisterHealthCheck, OnExecuteHealthCheck, - false /* async */, cq_, restricted_kv); + false /* async */, cq_, restricted_kv, response_delay_); } void @@ -567,7 +581,7 @@ CommonHandler::RegisterModelReady() ::grpc::ServerAsyncResponseWriter, inference::ModelReadyRequest, inference::ModelReadyResponse>( "ModelReady", 0, OnRegisterModelReady, OnExecuteModelReady, - false /* async */, cq_, restricted_kv); + false /* async */, cq_, restricted_kv, response_delay_); } void @@ -645,7 +659,7 @@ CommonHandler::RegisterServerMetadata() ::grpc::ServerAsyncResponseWriter, inference::ServerMetadataRequest, inference::ServerMetadataResponse>( "ServerMetadata", 0, OnRegisterServerMetadata, OnExecuteServerMetadata, - false /* async */, cq_, restricted_kv); + false /* async */, cq_, restricted_kv, response_delay_); } void @@ -813,7 +827,7 @@ CommonHandler::RegisterModelMetadata() ::grpc::ServerAsyncResponseWriter, inference::ModelMetadataRequest, inference::ModelMetadataResponse>( "ModelMetadata", 0, OnRegisterModelMetadata, OnExecuteModelMetadata, - false /* async */, cq_, restricted_kv); + false /* async */, cq_, restricted_kv, response_delay_); } void @@ -866,7 +880,7 @@ CommonHandler::RegisterModelConfig() ::grpc::ServerAsyncResponseWriter, inference::ModelConfigRequest, inference::ModelConfigResponse>( "ModelConfig", 0, OnRegisterModelConfig, OnExecuteModelConfig, - false /* async */, cq_, restricted_kv); + false /* async */, cq_, restricted_kv, response_delay_); } void @@ -1196,7 +1210,7 @@ CommonHandler::RegisterModelStatistics() ::grpc::ServerAsyncResponseWriter, inference::ModelStatisticsRequest, inference::ModelStatisticsResponse>( "ModelStatistics", 0, OnRegisterModelStatistics, OnExecuteModelStatistics, - false /* async */, cq_, restricted_kv); + false /* async */, cq_, restricted_kv, response_delay_); } void @@ -1471,7 +1485,7 @@ CommonHandler::RegisterTrace() ::grpc::ServerAsyncResponseWriter, inference::TraceSettingRequest, inference::TraceSettingResponse>( "Trace", 0, OnRegisterTrace, OnExecuteTrace, false /* async */, cq_, - restricted_kv); + restricted_kv, response_delay_); } void @@ -1680,7 +1694,7 @@ CommonHandler::RegisterLogging() ::grpc::ServerAsyncResponseWriter, inference::LogSettingsRequest, inference::LogSettingsResponse>( "Logging", 0, OnRegisterLogging, OnExecuteLogging, false /* async */, cq_, - restricted_kv); + restricted_kv, response_delay_); } void @@ -1754,7 +1768,8 @@ CommonHandler::RegisterSystemSharedMemoryStatus() inference::SystemSharedMemoryStatusRequest, inference::SystemSharedMemoryStatusResponse>( "SystemSharedMemoryStatus", 0, OnRegisterSystemSharedMemoryStatus, - OnExecuteSystemSharedMemoryStatus, false /* async */, cq_, restricted_kv); + OnExecuteSystemSharedMemoryStatus, false /* async */, cq_, restricted_kv, + response_delay_); } void @@ -1793,7 +1808,7 @@ CommonHandler::RegisterSystemSharedMemoryRegister() inference::SystemSharedMemoryRegisterResponse>( "SystemSharedMemoryRegister", 0, OnRegisterSystemSharedMemoryRegister, OnExecuteSystemSharedMemoryRegister, false /* async */, cq_, - restricted_kv); + restricted_kv, response_delay_); } void @@ -1836,7 +1851,7 @@ CommonHandler::RegisterSystemSharedMemoryUnregister() inference::SystemSharedMemoryUnregisterResponse>( "SystemSharedMemoryUnregister", 0, OnRegisterSystemSharedMemoryUnregister, OnExecuteSystemSharedMemoryUnregister, false /* async */, cq_, - restricted_kv); + restricted_kv, response_delay_); } void @@ -1902,7 +1917,8 @@ CommonHandler::RegisterCudaSharedMemoryStatus() inference::CudaSharedMemoryStatusRequest, inference::CudaSharedMemoryStatusResponse>( "CudaSharedMemoryStatus", 0, OnRegisterCudaSharedMemoryStatus, - OnExecuteCudaSharedMemoryStatus, false /* async */, cq_, restricted_kv); + OnExecuteCudaSharedMemoryStatus, false /* async */, cq_, restricted_kv, + response_delay_); } void @@ -1952,7 +1968,8 @@ CommonHandler::RegisterCudaSharedMemoryRegister() inference::CudaSharedMemoryRegisterRequest, inference::CudaSharedMemoryRegisterResponse>( "CudaSharedMemoryRegister", 0, OnRegisterCudaSharedMemoryRegister, - OnExecuteCudaSharedMemoryRegister, false /* async */, cq_, restricted_kv); + OnExecuteCudaSharedMemoryRegister, false /* async */, cq_, restricted_kv, + response_delay_); } void @@ -1995,7 +2012,7 @@ CommonHandler::RegisterCudaSharedMemoryUnregister() inference::CudaSharedMemoryUnregisterResponse>( "CudaSharedMemoryUnregister", 0, OnRegisterCudaSharedMemoryUnregister, OnExecuteCudaSharedMemoryUnregister, false /* async */, cq_, - restricted_kv); + restricted_kv, response_delay_); } void @@ -2097,7 +2114,7 @@ CommonHandler::RegisterRepositoryIndex() ::grpc::ServerAsyncResponseWriter, inference::RepositoryIndexRequest, inference::RepositoryIndexResponse>( "RepositoryIndex", 0, OnRegisterRepositoryIndex, OnExecuteRepositoryIndex, - false /* async */, cq_, restricted_kv); + false /* async */, cq_, restricted_kv, response_delay_); } void @@ -2209,7 +2226,8 @@ CommonHandler::RegisterRepositoryModelLoad() inference::RepositoryModelLoadRequest, inference::RepositoryModelLoadResponse>( "RepositoryModelLoad", 0, OnRegisterRepositoryModelLoad, - OnExecuteRepositoryModelLoad, true /* async */, cq_, restricted_kv); + OnExecuteRepositoryModelLoad, true /* async */, cq_, restricted_kv, + response_delay_); } void @@ -2278,7 +2296,8 @@ CommonHandler::RegisterRepositoryModelUnload() inference::RepositoryModelUnloadRequest, inference::RepositoryModelUnloadResponse>( "RepositoryModelUnload", 0, OnRegisterRepositoryModelUnload, - OnExecuteRepositoryModelUnload, true /* async */, cq_, restricted_kv); + OnExecuteRepositoryModelUnload, true /* async */, cq_, restricted_kv, + response_delay_); } } // namespace @@ -2388,9 +2407,15 @@ Server::Server( model_stream_infer_cq_ = builder_.AddCompletionQueue(); // A common Handler for other non-inference requests + const char* dstr = getenv("TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC"); + uint64_t response_delay = 0; + if (dstr != nullptr) { + response_delay = atoi(dstr); + } common_handler_.reset(new CommonHandler( "CommonHandler", tritonserver_, shm_manager_, trace_manager_, &service_, - &health_service_, common_cq_.get(), options.restricted_protocols_)); + &health_service_, common_cq_.get(), options.restricted_protocols_, + response_delay)); // [FIXME] "register" logic is different for infer // Handler for model inference requests. diff --git a/src/grpc/grpc_utils.cc b/src/grpc/grpc_utils.cc index 62fd93272bb..bf92b0a797c 100644 --- a/src/grpc/grpc_utils.cc +++ b/src/grpc/grpc_utils.cc @@ -77,18 +77,6 @@ operator<<(std::ostream& out, const Steps& step) void GrpcStatusUtil::Create(::grpc::Status* status, TRITONSERVER_Error* err) { - const char* dstr = getenv("TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC"); - uint64_t delay_response = 0; - if (dstr != nullptr) { - delay_response = atoi(dstr); - // Will delay the write of the response by the specified time. - // This can be used to test the flow where there are other - // responses available to be written. - LOG_VERBOSE(1) << "Delaying the write of the response by " << delay_response - << " seconds"; - std::this_thread::sleep_for(std::chrono::seconds(delay_response)); - } - if (err == nullptr) { *status = ::grpc::Status::OK; } else {