From 33440e323d25c187db1807246331b3af9f0f9550 Mon Sep 17 00:00:00 2001 From: gichan2-jang Date: Wed, 13 Mar 2024 14:19:42 +0900 Subject: [PATCH] [ml-service] Change node type Change ml-service node type from pub/sub to query server/client to send reply. Signed-off-by: gichan2-jang --- c/src/ml-api-service-offloading.c | 42 ++++-- c/src/ml-api-service-offloading.h | 2 +- c/src/ml-api-service-private.h | 1 + c/src/ml-api-service.c | 2 +- .../capi/unittest_capi_service_offloading.cc | 127 ++++++++++++++++-- .../config/service_offloading_receiver.conf | 12 +- .../config/service_offloading_sender.conf | 4 +- 7 files changed, 165 insertions(+), 25 deletions(-) diff --git a/c/src/ml-api-service-offloading.c b/c/src/ml-api-service-offloading.c index 88d063fd..f53997aa 100644 --- a/c/src/ml-api-service-offloading.c +++ b/c/src/ml-api-service-offloading.c @@ -38,6 +38,7 @@ typedef enum ML_SERVICE_OFFLOADING_TYPE_MODEL_URI, ML_SERVICE_OFFLOADING_TYPE_PIPELINE_RAW, ML_SERVICE_OFFLOADING_TYPE_PIPELINE_URI, + ML_SERVICE_OFFLOADING_TYPE_REPLY, ML_SERVICE_OFFLOADING_TYPE_MAX } ml_service_offloading_type_e; @@ -82,9 +83,9 @@ _mlrs_get_node_type (const gchar * value) return node_type; if (g_ascii_strcasecmp (value, "sender") == 0) { - node_type = NNS_EDGE_NODE_TYPE_PUB; + node_type = NNS_EDGE_NODE_TYPE_QUERY_CLIENT; } else if (g_ascii_strcasecmp (value, "receiver") == 0) { - node_type = NNS_EDGE_NODE_TYPE_SUB; + node_type = NNS_EDGE_NODE_TYPE_QUERY_SERVER; } else { _ml_error_report ("Invalid node type: %s, Please check ml_option.", value); } @@ -204,6 +205,8 @@ _mlrs_get_service_type (gchar * service_str) service_type = ML_SERVICE_OFFLOADING_TYPE_PIPELINE_RAW; } else if (g_ascii_strcasecmp (service_str, "pipeline_uri") == 0) { service_type = ML_SERVICE_OFFLOADING_TYPE_PIPELINE_URI; + } else if (g_ascii_strcasecmp (service_str, "reply") == 0) { + service_type = ML_SERVICE_OFFLOADING_TYPE_REPLY; } else { _ml_error_report ("Invalid service type: %s, Please check service type.", service_str); @@ -365,6 +368,7 @@ _mlrs_process_service_offloading (nns_edge_data_h data_h, void *user_data) _ml_service_offloading_s *offloading_s = (_ml_service_offloading_s *) mls->priv; ml_service_event_e event_type = ML_SERVICE_EVENT_UNKNOWN; + ml_information_h info_h = NULL; ret = nns_edge_data_get (data_h, 0, &data, &data_len); if (NNS_EDGE_ERROR_NONE != ret) { @@ -455,6 +459,20 @@ _mlrs_process_service_offloading (nns_edge_data_h data_h, void *user_data) event_type = ML_SERVICE_EVENT_PIPELINE_REGISTERED; } break; + case ML_SERVICE_OFFLOADING_TYPE_REPLY: + { + ret = _ml_information_create (&info_h); + if (ML_ERROR_NONE != ret) { + _ml_error_report_return (ret, "Failed to create information handle. "); + } + ret = _ml_information_set (info_h, "data", (void *) data, NULL); + if (ML_ERROR_NONE != ret) { + _ml_error_report ("Failed to set data information."); + goto done; + } + event_type = ML_SERVICE_EVENT_REPLY; + break; + } default: _ml_error_report ("Unknown service type or not supported yet. " "Service num: %d", service_type); @@ -463,10 +481,16 @@ _mlrs_process_service_offloading (nns_edge_data_h data_h, void *user_data) if (mls && event_type != ML_SERVICE_EVENT_UNKNOWN) { if (mls->cb_info.cb) { - mls->cb_info.cb (event_type, NULL, mls->cb_info.pdata); + mls->cb_info.cb (event_type, info_h, mls->cb_info.pdata); } } +done: + if (info_h) { + ret = ml_information_destroy (info_h); + _ml_error_report ("Failed to destroy service info handle."); + } + return ret; } @@ -485,7 +509,8 @@ _mlrs_edge_event_cb (nns_edge_event_h event_h, void *user_data) return ret; switch (event) { - case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:{ + case NNS_EDGE_EVENT_NEW_DATA_RECEIVED: + { ret = nns_edge_event_parse_new_data (event_h, &data_h); if (NNS_EDGE_ERROR_NONE != ret) return ret; @@ -538,7 +563,7 @@ _mlrs_create_edge_handle (ml_service_s * mls, edge_info_s * edge_info) return ret; } - if (edge_info->node_type == NNS_EDGE_NODE_TYPE_SUB) { + if (edge_info->node_type == NNS_EDGE_NODE_TYPE_QUERY_CLIENT) { ret = nns_edge_connect (edge_h, edge_info->dest_host, edge_info->dest_port); if (NNS_EDGE_ERROR_NONE != ret) { @@ -581,10 +606,11 @@ ml_service_offloading_release_internal (ml_service_s * mls) } /** - * @brief Set value in ml-service remote handle. + * @brief Set value in ml-service offloading handle. */ int -ml_service_remote_set_information (ml_service_h handle, const gchar * name, const gchar * value) +ml_service_offloading_set_information (ml_service_h handle, const gchar * name, + const gchar * value) { ml_service_s *mls = (ml_service_s *) handle; _ml_service_offloading_s *mlrs = (_ml_service_offloading_s *) mls->priv; @@ -646,7 +672,7 @@ ml_service_offloading_create (ml_service_h handle, ml_option_h option) } if (ML_ERROR_NONE == ml_option_get (option, "path", (void **) (&_path))) { - ret = ml_service_remote_set_information (mls, "path", _path); + ret = ml_service_offloading_set_information (mls, "path", _path); if (ML_ERROR_NONE != ret) { _ml_error_report_return (ret, "Failed to set path in ml-service offloading handle."); diff --git a/c/src/ml-api-service-offloading.h b/c/src/ml-api-service-offloading.h index 300d1c84..3b2ca134 100644 --- a/c/src/ml-api-service-offloading.h +++ b/c/src/ml-api-service-offloading.h @@ -72,7 +72,7 @@ int ml_service_offloading_set_service (ml_service_h handle, const char *name, co * @retval #ML_ERROR_NOT_SUPPORTED Not supported. * @retval #ML_ERROR_INVALID_PARAMETER Given parameter is invalid. */ -int ml_service_remote_set_information (ml_service_h handle, const char *name, const char *value); +int ml_service_offloading_set_information (ml_service_h handle, const char *name, const char *value); #ifdef __cplusplus } diff --git a/c/src/ml-api-service-private.h b/c/src/ml-api-service-private.h index a7d6a528..8220c855 100644 --- a/c/src/ml-api-service-private.h +++ b/c/src/ml-api-service-private.h @@ -31,6 +31,7 @@ extern "C" { */ #define ML_SERVICE_EVENT_MODEL_REGISTERED 2 #define ML_SERVICE_EVENT_PIPELINE_REGISTERED 3 +#define ML_SERVICE_EVENT_REPLY 4 /** * @brief Enumeration for ml-service type. diff --git a/c/src/ml-api-service.c b/c/src/ml-api-service.c index 275eeeae..4fef6fe6 100644 --- a/c/src/ml-api-service.c +++ b/c/src/ml-api-service.c @@ -70,7 +70,7 @@ _ml_service_set_information_internal (ml_service_s * mls, const char *name, break; case ML_SERVICE_TYPE_OFFLOADING: { - status = ml_service_remote_set_information (mls, name, value); + status = ml_service_offloading_set_information (mls, name, value); break; } default: diff --git a/tests/capi/unittest_capi_service_offloading.cc b/tests/capi/unittest_capi_service_offloading.cc index 804d2ff1..1afb92b7 100644 --- a/tests/capi/unittest_capi_service_offloading.cc +++ b/tests/capi/unittest_capi_service_offloading.cc @@ -20,6 +20,15 @@ #include "ml-api-service-offloading.h" +/** + * @brief Structure for ml-service event callback. + */ +typedef struct { + ml_service_h handle; + void *data; + gboolean received_reply; +} _ml_service_test_data_s; + /** * @brief Internal function to get the config file path. */ @@ -48,6 +57,7 @@ class MLOffloadingService : public ::testing::Test int status; ml_service_h client_h; ml_service_h server_h; + _ml_service_test_data_s test_data; public: /** @@ -65,14 +75,15 @@ class MLOffloadingService : public ::testing::Test g_test_dbus_up (dbus); - g_autofree gchar *sender_config = _get_config_path ("service_offloading_sender.conf"); - status = ml_service_new (sender_config, &client_h); - ASSERT_EQ (status, ML_ERROR_NONE); - g_autofree gchar *receiver_config = _get_config_path ("service_offloading_receiver.conf"); status = ml_service_new (receiver_config, &server_h); ASSERT_EQ (status, ML_ERROR_NONE); + test_data.handle = server_h; + + g_autofree gchar *sender_config = _get_config_path ("service_offloading_sender.conf"); + status = ml_service_new (sender_config, &client_h); + ASSERT_EQ (status, ML_ERROR_NONE); } /** @@ -129,6 +140,7 @@ static void _ml_service_event_cb (ml_service_event_e event, ml_information_h event_data, void *user_data) { int status; + _ml_service_test_data_s *test_data = (_ml_service_test_data_s *) user_data; /** @todo remove typecast to int after new event type is added. */ switch ((int) event) { @@ -138,7 +150,7 @@ _ml_service_event_cb (ml_service_event_e event, ml_information_h event_data, voi const gchar *service_key = "pipeline_registration_test_key"; status = ml_service_pipeline_get (service_key, &ret_pipeline); EXPECT_EQ (ML_ERROR_NONE, status); - EXPECT_STREQ ((gchar *) user_data, ret_pipeline); + EXPECT_STREQ ((gchar *) test_data->data, ret_pipeline); break; } case ML_SERVICE_EVENT_MODEL_REGISTERED: @@ -159,7 +171,8 @@ _ml_service_event_cb (ml_service_event_e event, ml_information_h event_data, voi gsize activated_model_len = 0; EXPECT_TRUE (g_file_get_contents (activated_model_path, &activated_model_contents, &activated_model_len, NULL)); - EXPECT_EQ (memcmp ((gchar *) user_data, activated_model_contents, activated_model_len), 0); + EXPECT_EQ (memcmp ((gchar *) test_data->data, activated_model_contents, activated_model_len), + 0); status = g_remove (activated_model_path); EXPECT_TRUE (status == 0); @@ -182,7 +195,8 @@ TEST_F (MLOffloadingService, registerPipeline) ml_tensor_dimension in_dim = { 0 }; gchar *pipeline_desc = g_strdup ("fakesrc ! fakesink"); - status = ml_service_set_event_cb (server_h, _ml_service_event_cb, pipeline_desc); + test_data.data = pipeline_desc; + status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data); EXPECT_EQ (status, ML_ERROR_NONE); status = ml_tensors_info_create (&in_info); @@ -224,7 +238,8 @@ TEST_F (MLOffloadingService, registerPipelineURI) ml_tensor_dimension in_dim = { 0 }; g_autofree gchar *pipeline_desc = g_strdup ("fakesrc ! fakesink"); - status = ml_service_set_event_cb (server_h, _ml_service_event_cb, pipeline_desc); + test_data.data = pipeline_desc; + status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data); EXPECT_EQ (status, ML_ERROR_NONE); gchar *current_dir = g_get_current_dir (); @@ -345,7 +360,8 @@ TEST_F (MLOffloadingService, registerModel) gsize len = 0; EXPECT_TRUE (g_file_get_contents (test_model, &contents, &len, NULL)); - status = ml_service_set_event_cb (server_h, _ml_service_event_cb, contents); + test_data.data = contents; + status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data); EXPECT_EQ (status, ML_ERROR_NONE); status = ml_tensors_info_create (&in_info); @@ -395,7 +411,8 @@ TEST_F (MLOffloadingService, registerModelURI) gsize len = 0; EXPECT_TRUE (g_file_get_contents (test_model_path, &contents, &len, NULL)); - status = ml_service_set_event_cb (server_h, _ml_service_event_cb, contents); + test_data.data = contents; + status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data); EXPECT_EQ (status, ML_ERROR_NONE); g_autofree gchar *model_uri = g_strdup_printf ("file://%s", test_model_path); @@ -455,7 +472,8 @@ TEST_F (MLOffloadingService, registerModelPath) gsize len = 0; EXPECT_TRUE (g_file_get_contents (test_model, &contents, &len, NULL)); - status = ml_service_set_event_cb (server_h, _ml_service_event_cb, contents); + test_data.data = contents; + status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data); EXPECT_EQ (status, ML_ERROR_NONE); status = ml_tensors_info_create (&in_info); @@ -518,6 +536,93 @@ TEST_F (MLOffloadingService, requestInvalidParam_n) EXPECT_EQ (ML_ERROR_NONE, status); } + +/** + * @brief Callback function for reply test. + */ +static void +_ml_service_reply_test_cb (ml_service_event_e event, ml_information_h event_data, void *user_data) +{ + int status; + + switch ((int) event) { + case ML_SERVICE_EVENT_PIPELINE_REGISTERED: + { + _ml_service_test_data_s *test_data = (_ml_service_test_data_s *) user_data; + g_autofree gchar *ret_pipeline = NULL; + void *_data; + size_t _size; + const gchar *service_key = "pipeline_registration_test_key"; + status = ml_service_pipeline_get (service_key, &ret_pipeline); + EXPECT_EQ (ML_ERROR_NONE, status); + status = ml_tensors_data_get_tensor_data (test_data->data, 0, &_data, &_size); + EXPECT_EQ (ML_ERROR_NONE, status); + EXPECT_STREQ ((gchar *) _data, ret_pipeline); + + ml_service_request (test_data->handle, "reply_to_client", test_data->data); + break; + } + case ML_SERVICE_EVENT_REPLY: + { + gint *received = (gint *) user_data; + (*received)++; + break; + } + default: + break; + } +} + +/** + * @brief use case of replying to client. + */ +TEST_F (MLOffloadingService, replyToClient) +{ + ml_tensors_data_h input = NULL; + ml_tensors_info_h in_info = NULL; + ml_tensor_dimension in_dim = { 0 }; + gint received = 0; + + gchar *pipeline_desc = g_strdup ("fakesrc ! fakesink"); + + status = ml_tensors_info_create (&in_info); + EXPECT_EQ (ML_ERROR_NONE, status); + ml_tensors_info_set_count (in_info, 1); + ml_tensors_info_set_tensor_type (in_info, 0, ML_TENSOR_TYPE_UINT8); + in_dim[0] = strlen (pipeline_desc) + 1; + ml_tensors_info_set_tensor_dimension (in_info, 0, in_dim); + status = ml_tensors_data_create (in_info, &input); + EXPECT_EQ (ML_ERROR_NONE, status); + status = ml_tensors_data_set_tensor_data ( + input, 0, pipeline_desc, strlen (pipeline_desc) + 1); + EXPECT_EQ (ML_ERROR_NONE, status); + + test_data.data = input; + status = ml_service_set_event_cb (server_h, _ml_service_reply_test_cb, &test_data); + EXPECT_EQ (status, ML_ERROR_NONE); + + status = ml_service_set_event_cb (client_h, _ml_service_reply_test_cb, &received); + EXPECT_EQ (status, ML_ERROR_NONE); + + status = ml_service_request (client_h, "pipeline_registration_raw", input); + EXPECT_EQ (ML_ERROR_NONE, status); + + /* Wait for the server to register and check the result. */ + g_usleep (1000000); + + EXPECT_GT (received, 0); + + status = ml_service_pipeline_delete ("pipeline_registration_test_key"); + EXPECT_TRUE (status == ML_ERROR_NONE); + + status = ml_tensors_info_destroy (in_info); + EXPECT_EQ (ML_ERROR_NONE, status); + status = ml_tensors_data_destroy (input); + EXPECT_EQ (ML_ERROR_NONE, status); + + g_free (pipeline_desc); +} + /** * @brief Main gtest */ diff --git a/tests/test_models/config/service_offloading_receiver.conf b/tests/test_models/config/service_offloading_receiver.conf index 5303487c..1f34497d 100644 --- a/tests/test_models/config/service_offloading_receiver.conf +++ b/tests/test_models/config/service_offloading_receiver.conf @@ -2,9 +2,17 @@ "offloading" : { "node-type" : "receiver", - "dest-host" : "127.0.0.1", - "dest-port" : "3001", + "host" : "127.0.0.1", + "port" : "3000", "connect-type" : "TCP", "topic" : "offloading_service_test_topic" + }, + "services" : + { + "reply_to_client" : + { + "service-type" : "reply", + "service-key" : "reply_to_client" + } } } diff --git a/tests/test_models/config/service_offloading_sender.conf b/tests/test_models/config/service_offloading_sender.conf index 337ae8ec..12454513 100644 --- a/tests/test_models/config/service_offloading_sender.conf +++ b/tests/test_models/config/service_offloading_sender.conf @@ -2,8 +2,8 @@ "offloading" : { "node-type" : "sender", - "host" : "127.0.0.1", - "port" : "3001", + "dest-host" : "127.0.0.1", + "dest-port" : "3000", "connect-type" : "TCP", "topic" : "offloading_service_test_topic" },