Skip to content

Commit

Permalink
[ml-service] Change node type
Browse files Browse the repository at this point in the history
Change ml-service node type from pub/sub to query server/client to send
reply.

Signed-off-by: gichan2-jang <[email protected]>
  • Loading branch information
gichan-jang authored and jaeyun-jung committed Apr 4, 2024
1 parent cebba38 commit 33440e3
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 25 deletions.
42 changes: 34 additions & 8 deletions c/src/ml-api-service-offloading.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ typedef enum
ML_SERVICE_OFFLOADING_TYPE_MODEL_URI,
ML_SERVICE_OFFLOADING_TYPE_PIPELINE_RAW,
ML_SERVICE_OFFLOADING_TYPE_PIPELINE_URI,
ML_SERVICE_OFFLOADING_TYPE_REPLY,

ML_SERVICE_OFFLOADING_TYPE_MAX
} ml_service_offloading_type_e;
Expand Down Expand Up @@ -82,9 +83,9 @@ _mlrs_get_node_type (const gchar * value)
return node_type;

if (g_ascii_strcasecmp (value, "sender") == 0) {
node_type = NNS_EDGE_NODE_TYPE_PUB;
node_type = NNS_EDGE_NODE_TYPE_QUERY_CLIENT;
} else if (g_ascii_strcasecmp (value, "receiver") == 0) {
node_type = NNS_EDGE_NODE_TYPE_SUB;
node_type = NNS_EDGE_NODE_TYPE_QUERY_SERVER;
} else {
_ml_error_report ("Invalid node type: %s, Please check ml_option.", value);
}
Expand Down Expand Up @@ -204,6 +205,8 @@ _mlrs_get_service_type (gchar * service_str)
service_type = ML_SERVICE_OFFLOADING_TYPE_PIPELINE_RAW;
} else if (g_ascii_strcasecmp (service_str, "pipeline_uri") == 0) {
service_type = ML_SERVICE_OFFLOADING_TYPE_PIPELINE_URI;
} else if (g_ascii_strcasecmp (service_str, "reply") == 0) {
service_type = ML_SERVICE_OFFLOADING_TYPE_REPLY;
} else {
_ml_error_report ("Invalid service type: %s, Please check service type.",
service_str);
Expand Down Expand Up @@ -365,6 +368,7 @@ _mlrs_process_service_offloading (nns_edge_data_h data_h, void *user_data)
_ml_service_offloading_s *offloading_s =
(_ml_service_offloading_s *) mls->priv;
ml_service_event_e event_type = ML_SERVICE_EVENT_UNKNOWN;
ml_information_h info_h = NULL;

ret = nns_edge_data_get (data_h, 0, &data, &data_len);
if (NNS_EDGE_ERROR_NONE != ret) {
Expand Down Expand Up @@ -455,6 +459,20 @@ _mlrs_process_service_offloading (nns_edge_data_h data_h, void *user_data)
event_type = ML_SERVICE_EVENT_PIPELINE_REGISTERED;
}
break;
case ML_SERVICE_OFFLOADING_TYPE_REPLY:
{
ret = _ml_information_create (&info_h);
if (ML_ERROR_NONE != ret) {
_ml_error_report_return (ret, "Failed to create information handle. ");
}
ret = _ml_information_set (info_h, "data", (void *) data, NULL);
if (ML_ERROR_NONE != ret) {
_ml_error_report ("Failed to set data information.");
goto done;
}
event_type = ML_SERVICE_EVENT_REPLY;
break;
}
default:
_ml_error_report ("Unknown service type or not supported yet. "
"Service num: %d", service_type);
Expand All @@ -463,10 +481,16 @@ _mlrs_process_service_offloading (nns_edge_data_h data_h, void *user_data)

if (mls && event_type != ML_SERVICE_EVENT_UNKNOWN) {
if (mls->cb_info.cb) {
mls->cb_info.cb (event_type, NULL, mls->cb_info.pdata);
mls->cb_info.cb (event_type, info_h, mls->cb_info.pdata);
}
}

done:
if (info_h) {
ret = ml_information_destroy (info_h);
_ml_error_report ("Failed to destroy service info handle.");
}

return ret;
}

Expand All @@ -485,7 +509,8 @@ _mlrs_edge_event_cb (nns_edge_event_h event_h, void *user_data)
return ret;

switch (event) {
case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:{
case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
{
ret = nns_edge_event_parse_new_data (event_h, &data_h);
if (NNS_EDGE_ERROR_NONE != ret)
return ret;
Expand Down Expand Up @@ -538,7 +563,7 @@ _mlrs_create_edge_handle (ml_service_s * mls, edge_info_s * edge_info)
return ret;
}

if (edge_info->node_type == NNS_EDGE_NODE_TYPE_SUB) {
if (edge_info->node_type == NNS_EDGE_NODE_TYPE_QUERY_CLIENT) {
ret = nns_edge_connect (edge_h, edge_info->dest_host, edge_info->dest_port);

if (NNS_EDGE_ERROR_NONE != ret) {
Expand Down Expand Up @@ -581,10 +606,11 @@ ml_service_offloading_release_internal (ml_service_s * mls)
}

/**
* @brief Set value in ml-service remote handle.
* @brief Set value in ml-service offloading handle.
*/
int
ml_service_remote_set_information (ml_service_h handle, const gchar * name, const gchar * value)
ml_service_offloading_set_information (ml_service_h handle, const gchar * name,
const gchar * value)
{
ml_service_s *mls = (ml_service_s *) handle;
_ml_service_offloading_s *mlrs = (_ml_service_offloading_s *) mls->priv;
Expand Down Expand Up @@ -646,7 +672,7 @@ ml_service_offloading_create (ml_service_h handle, ml_option_h option)
}

if (ML_ERROR_NONE == ml_option_get (option, "path", (void **) (&_path))) {
ret = ml_service_remote_set_information (mls, "path", _path);
ret = ml_service_offloading_set_information (mls, "path", _path);
if (ML_ERROR_NONE != ret) {
_ml_error_report_return (ret,
"Failed to set path in ml-service offloading handle.");
Expand Down
2 changes: 1 addition & 1 deletion c/src/ml-api-service-offloading.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ int ml_service_offloading_set_service (ml_service_h handle, const char *name, co
* @retval #ML_ERROR_NOT_SUPPORTED Not supported.
* @retval #ML_ERROR_INVALID_PARAMETER Given parameter is invalid.
*/
int ml_service_remote_set_information (ml_service_h handle, const char *name, const char *value);
int ml_service_offloading_set_information (ml_service_h handle, const char *name, const char *value);

#ifdef __cplusplus
}
Expand Down
1 change: 1 addition & 0 deletions c/src/ml-api-service-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ extern "C" {
*/
#define ML_SERVICE_EVENT_MODEL_REGISTERED 2
#define ML_SERVICE_EVENT_PIPELINE_REGISTERED 3
#define ML_SERVICE_EVENT_REPLY 4

/**
* @brief Enumeration for ml-service type.
Expand Down
2 changes: 1 addition & 1 deletion c/src/ml-api-service.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ _ml_service_set_information_internal (ml_service_s * mls, const char *name,
break;
case ML_SERVICE_TYPE_OFFLOADING:
{
status = ml_service_remote_set_information (mls, name, value);
status = ml_service_offloading_set_information (mls, name, value);
break;
}
default:
Expand Down
127 changes: 116 additions & 11 deletions tests/capi/unittest_capi_service_offloading.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@

#include "ml-api-service-offloading.h"

/**
* @brief Structure for ml-service event callback.
*/
typedef struct {
ml_service_h handle;
void *data;
gboolean received_reply;
} _ml_service_test_data_s;

/**
* @brief Internal function to get the config file path.
*/
Expand Down Expand Up @@ -48,6 +57,7 @@ class MLOffloadingService : public ::testing::Test
int status;
ml_service_h client_h;
ml_service_h server_h;
_ml_service_test_data_s test_data;

public:
/**
Expand All @@ -65,14 +75,15 @@ class MLOffloadingService : public ::testing::Test

g_test_dbus_up (dbus);

g_autofree gchar *sender_config = _get_config_path ("service_offloading_sender.conf");
status = ml_service_new (sender_config, &client_h);
ASSERT_EQ (status, ML_ERROR_NONE);

g_autofree gchar *receiver_config
= _get_config_path ("service_offloading_receiver.conf");
status = ml_service_new (receiver_config, &server_h);
ASSERT_EQ (status, ML_ERROR_NONE);
test_data.handle = server_h;

g_autofree gchar *sender_config = _get_config_path ("service_offloading_sender.conf");
status = ml_service_new (sender_config, &client_h);
ASSERT_EQ (status, ML_ERROR_NONE);
}

/**
Expand Down Expand Up @@ -129,6 +140,7 @@ static void
_ml_service_event_cb (ml_service_event_e event, ml_information_h event_data, void *user_data)
{
int status;
_ml_service_test_data_s *test_data = (_ml_service_test_data_s *) user_data;

/** @todo remove typecast to int after new event type is added. */
switch ((int) event) {
Expand All @@ -138,7 +150,7 @@ _ml_service_event_cb (ml_service_event_e event, ml_information_h event_data, voi
const gchar *service_key = "pipeline_registration_test_key";
status = ml_service_pipeline_get (service_key, &ret_pipeline);
EXPECT_EQ (ML_ERROR_NONE, status);
EXPECT_STREQ ((gchar *) user_data, ret_pipeline);
EXPECT_STREQ ((gchar *) test_data->data, ret_pipeline);
break;
}
case ML_SERVICE_EVENT_MODEL_REGISTERED:
Expand All @@ -159,7 +171,8 @@ _ml_service_event_cb (ml_service_event_e event, ml_information_h event_data, voi
gsize activated_model_len = 0;
EXPECT_TRUE (g_file_get_contents (activated_model_path,
&activated_model_contents, &activated_model_len, NULL));
EXPECT_EQ (memcmp ((gchar *) user_data, activated_model_contents, activated_model_len), 0);
EXPECT_EQ (memcmp ((gchar *) test_data->data, activated_model_contents, activated_model_len),
0);

status = g_remove (activated_model_path);
EXPECT_TRUE (status == 0);
Expand All @@ -182,7 +195,8 @@ TEST_F (MLOffloadingService, registerPipeline)
ml_tensor_dimension in_dim = { 0 };

gchar *pipeline_desc = g_strdup ("fakesrc ! fakesink");
status = ml_service_set_event_cb (server_h, _ml_service_event_cb, pipeline_desc);
test_data.data = pipeline_desc;
status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data);
EXPECT_EQ (status, ML_ERROR_NONE);

status = ml_tensors_info_create (&in_info);
Expand Down Expand Up @@ -224,7 +238,8 @@ TEST_F (MLOffloadingService, registerPipelineURI)
ml_tensor_dimension in_dim = { 0 };

g_autofree gchar *pipeline_desc = g_strdup ("fakesrc ! fakesink");
status = ml_service_set_event_cb (server_h, _ml_service_event_cb, pipeline_desc);
test_data.data = pipeline_desc;
status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data);
EXPECT_EQ (status, ML_ERROR_NONE);

gchar *current_dir = g_get_current_dir ();
Expand Down Expand Up @@ -345,7 +360,8 @@ TEST_F (MLOffloadingService, registerModel)
gsize len = 0;
EXPECT_TRUE (g_file_get_contents (test_model, &contents, &len, NULL));

status = ml_service_set_event_cb (server_h, _ml_service_event_cb, contents);
test_data.data = contents;
status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data);
EXPECT_EQ (status, ML_ERROR_NONE);

status = ml_tensors_info_create (&in_info);
Expand Down Expand Up @@ -395,7 +411,8 @@ TEST_F (MLOffloadingService, registerModelURI)
gsize len = 0;
EXPECT_TRUE (g_file_get_contents (test_model_path, &contents, &len, NULL));

status = ml_service_set_event_cb (server_h, _ml_service_event_cb, contents);
test_data.data = contents;
status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data);
EXPECT_EQ (status, ML_ERROR_NONE);

g_autofree gchar *model_uri = g_strdup_printf ("file://%s", test_model_path);
Expand Down Expand Up @@ -455,7 +472,8 @@ TEST_F (MLOffloadingService, registerModelPath)
gsize len = 0;
EXPECT_TRUE (g_file_get_contents (test_model, &contents, &len, NULL));

status = ml_service_set_event_cb (server_h, _ml_service_event_cb, contents);
test_data.data = contents;
status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data);
EXPECT_EQ (status, ML_ERROR_NONE);

status = ml_tensors_info_create (&in_info);
Expand Down Expand Up @@ -518,6 +536,93 @@ TEST_F (MLOffloadingService, requestInvalidParam_n)
EXPECT_EQ (ML_ERROR_NONE, status);
}


/**
* @brief Callback function for reply test.
*/
static void
_ml_service_reply_test_cb (ml_service_event_e event, ml_information_h event_data, void *user_data)
{
int status;

switch ((int) event) {
case ML_SERVICE_EVENT_PIPELINE_REGISTERED:
{
_ml_service_test_data_s *test_data = (_ml_service_test_data_s *) user_data;
g_autofree gchar *ret_pipeline = NULL;
void *_data;
size_t _size;
const gchar *service_key = "pipeline_registration_test_key";
status = ml_service_pipeline_get (service_key, &ret_pipeline);
EXPECT_EQ (ML_ERROR_NONE, status);
status = ml_tensors_data_get_tensor_data (test_data->data, 0, &_data, &_size);
EXPECT_EQ (ML_ERROR_NONE, status);
EXPECT_STREQ ((gchar *) _data, ret_pipeline);

ml_service_request (test_data->handle, "reply_to_client", test_data->data);
break;
}
case ML_SERVICE_EVENT_REPLY:
{
gint *received = (gint *) user_data;
(*received)++;
break;
}
default:
break;
}
}

/**
* @brief use case of replying to client.
*/
TEST_F (MLOffloadingService, replyToClient)
{
ml_tensors_data_h input = NULL;
ml_tensors_info_h in_info = NULL;
ml_tensor_dimension in_dim = { 0 };
gint received = 0;

gchar *pipeline_desc = g_strdup ("fakesrc ! fakesink");

status = ml_tensors_info_create (&in_info);
EXPECT_EQ (ML_ERROR_NONE, status);
ml_tensors_info_set_count (in_info, 1);
ml_tensors_info_set_tensor_type (in_info, 0, ML_TENSOR_TYPE_UINT8);
in_dim[0] = strlen (pipeline_desc) + 1;
ml_tensors_info_set_tensor_dimension (in_info, 0, in_dim);
status = ml_tensors_data_create (in_info, &input);
EXPECT_EQ (ML_ERROR_NONE, status);
status = ml_tensors_data_set_tensor_data (
input, 0, pipeline_desc, strlen (pipeline_desc) + 1);
EXPECT_EQ (ML_ERROR_NONE, status);

test_data.data = input;
status = ml_service_set_event_cb (server_h, _ml_service_reply_test_cb, &test_data);
EXPECT_EQ (status, ML_ERROR_NONE);

status = ml_service_set_event_cb (client_h, _ml_service_reply_test_cb, &received);
EXPECT_EQ (status, ML_ERROR_NONE);

status = ml_service_request (client_h, "pipeline_registration_raw", input);
EXPECT_EQ (ML_ERROR_NONE, status);

/* Wait for the server to register and check the result. */
g_usleep (1000000);

EXPECT_GT (received, 0);

status = ml_service_pipeline_delete ("pipeline_registration_test_key");
EXPECT_TRUE (status == ML_ERROR_NONE);

status = ml_tensors_info_destroy (in_info);
EXPECT_EQ (ML_ERROR_NONE, status);
status = ml_tensors_data_destroy (input);
EXPECT_EQ (ML_ERROR_NONE, status);

g_free (pipeline_desc);
}

/**
* @brief Main gtest
*/
Expand Down
12 changes: 10 additions & 2 deletions tests/test_models/config/service_offloading_receiver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@
"offloading" :
{
"node-type" : "receiver",
"dest-host" : "127.0.0.1",
"dest-port" : "3001",
"host" : "127.0.0.1",
"port" : "3000",
"connect-type" : "TCP",
"topic" : "offloading_service_test_topic"
},
"services" :
{
"reply_to_client" :
{
"service-type" : "reply",
"service-key" : "reply_to_client"
}
}
}
4 changes: 2 additions & 2 deletions tests/test_models/config/service_offloading_sender.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"offloading" :
{
"node-type" : "sender",
"host" : "127.0.0.1",
"port" : "3001",
"dest-host" : "127.0.0.1",
"dest-port" : "3000",
"connect-type" : "TCP",
"topic" : "offloading_service_test_topic"
},
Expand Down

0 comments on commit 33440e3

Please sign in to comment.