jsonrpc: add connection close callback

Connection close cb is called when connection is terminated or server is
shutting down.

Change-Id: Ia455bc5a72d690a4ace056c5a4141760381df678
Signed-off-by: Pawel Wodkowski <pawelx.wodkowski@intel.com>
Reviewed-on: https://review.gerrithub.io/c/436195
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Chandler-Test-Pool: SPDK Automated Test System <sys_sgsw@intel.com>
Reviewed-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
Pawel Wodkowski 2018-12-05 16:28:23 +01:00 committed by Jim Harris
parent 7fecba0c02
commit 5b1b3ddf33
5 changed files with 235 additions and 22 deletions

View File

@ -89,6 +89,10 @@ typedef void (*spdk_jsonrpc_handle_request_fn)(
const struct spdk_json_val *method, const struct spdk_json_val *method,
const struct spdk_json_val *params); const struct spdk_json_val *params);
struct spdk_jsonrpc_server_conn;
typedef void (*spdk_jsonrpc_conn_closed_fn)(struct spdk_jsonrpc_server_conn *conn, void *arg);
/** /**
* Function for specific RPC method response parsing handlers. * Function for specific RPC method response parsing handlers.
* *
@ -134,6 +138,46 @@ int spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server);
*/ */
void spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server); void spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server);
/**
* Return connection associated to \c request
*
* \param request JSON-RPC request
* \return JSON RPC server connection
*/
struct spdk_jsonrpc_server_conn *spdk_jsonrpc_get_conn(struct spdk_jsonrpc_request *request);
/**
* Add callback called when connection is closed. Pair of \c cb and \c ctx must be unique or error is returned.
* Registered callback is called only once and there is no need to call \c spdk_jsonrpc_conn_del_close_cb
* inside from \c cb.
*
* \note Current implementation allow only one close callback per connection.
*
* \param conn JSON RPC server connection
* \param cb calback function
* \param ctx argument for \c cb
*
* \return 0 on success, or negated errno code:
* -EEXIST \c cb and \c ctx is already registered
* -ENOTCONN Callback can't be added because connection is closed.
* -ENOSPC no more space to register callback.
*/
int spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
spdk_jsonrpc_conn_closed_fn cb, void *ctx);
/**
* Remove registered close callback.
*
* \param conn JSON RPC server connection
* \param cb calback function
* \param ctx argument for \c cb
*
* \return 0 on success, or negated errno code:
* -ENOENT \c cb and \c ctx pair is not registered
*/
int spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
spdk_jsonrpc_conn_closed_fn cb, void *ctx);
/** /**
* Begin building a response to a JSON-RPC request. * Begin building a response to a JSON-RPC request.
* *

View File

@ -81,6 +81,9 @@ struct spdk_jsonrpc_server_conn {
struct spdk_jsonrpc_request *send_request; struct spdk_jsonrpc_request *send_request;
spdk_jsonrpc_conn_closed_fn close_cb;
void *close_cb_ctx;
TAILQ_ENTRY(spdk_jsonrpc_server_conn) link; TAILQ_ENTRY(spdk_jsonrpc_server_conn) link;
}; };

View File

@ -188,6 +188,12 @@ spdk_jsonrpc_parse_request(struct spdk_jsonrpc_server_conn *conn, void *json, si
return end - json; return end - json;
} }
struct spdk_jsonrpc_server_conn *
spdk_jsonrpc_get_conn(struct spdk_jsonrpc_request *request)
{
return request->conn;
}
static int static int
spdk_jsonrpc_server_write_cb(void *cb_ctx, const void *data, size_t size) spdk_jsonrpc_server_write_cb(void *cb_ctx, const void *data, size_t size)
{ {

View File

@ -33,6 +33,7 @@
#include "jsonrpc_internal.h" #include "jsonrpc_internal.h"
#include "spdk/string.h" #include "spdk/string.h"
#include "spdk/util.h"
struct spdk_jsonrpc_server * struct spdk_jsonrpc_server *
spdk_jsonrpc_server_listen(int domain, int protocol, spdk_jsonrpc_server_listen(int domain, int protocol,
@ -97,20 +98,6 @@ spdk_jsonrpc_server_listen(int domain, int protocol,
return server; return server;
} }
void
spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
{
struct spdk_jsonrpc_server_conn *conn;
close(server->sockfd);
TAILQ_FOREACH(conn, &server->conns, link) {
close(conn->sockfd);
}
free(server);
}
static void static void
spdk_jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn) spdk_jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
{ {
@ -119,9 +106,27 @@ spdk_jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
if (conn->sockfd >= 0) { if (conn->sockfd >= 0) {
close(conn->sockfd); close(conn->sockfd);
conn->sockfd = -1; conn->sockfd = -1;
if (conn->close_cb) {
conn->close_cb(conn, conn->close_cb_ctx);
}
} }
} }
void
spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
{
struct spdk_jsonrpc_server_conn *conn;
close(server->sockfd);
TAILQ_FOREACH(conn, &server->conns, link) {
spdk_jsonrpc_server_conn_close(conn);
}
free(server);
}
static void static void
spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn) spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
{ {
@ -136,6 +141,41 @@ spdk_jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
TAILQ_INSERT_HEAD(&server->free_conns, conn, link); TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
} }
int
spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
spdk_jsonrpc_conn_closed_fn cb, void *ctx)
{
int rc = 0;
pthread_spin_lock(&conn->queue_lock);
if (conn->close_cb == NULL) {
conn->close_cb = cb;
conn->close_cb_ctx = ctx;
} else {
rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC;
}
pthread_spin_unlock(&conn->queue_lock);
return rc;
}
int
spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
spdk_jsonrpc_conn_closed_fn cb, void *ctx)
{
int rc = 0;
pthread_spin_lock(&conn->queue_lock);
if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) {
rc = -ENOENT;
} else {
conn->close_cb = NULL;
}
pthread_spin_unlock(&conn->queue_lock);
return rc;
}
static int static int
spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server) spdk_jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
{ {

View File

@ -51,6 +51,22 @@ struct get_jsonrpc_methods_resp {
size_t method_num; size_t method_num;
}; };
static int
_rpc_client_wait_for_response(struct spdk_jsonrpc_client *client)
{
int rc;
do {
rc = spdk_jsonrpc_client_poll(client, 1);
} while (rc == 0 || rc == -ENOTCONN);
if (rc <= 0) {
SPDK_ERRLOG("Failed to get response: %d\n", rc);
}
return rc;
}
static int static int
get_jsonrpc_method_json_parser(struct get_jsonrpc_methods_resp *resp, get_jsonrpc_method_json_parser(struct get_jsonrpc_methods_resp *resp,
const struct spdk_json_val *result) const struct spdk_json_val *result)
@ -77,13 +93,8 @@ spdk_jsonrpc_client_check_rpc_method(struct spdk_jsonrpc_client *client, char *m
spdk_jsonrpc_end_request(request, w); spdk_jsonrpc_end_request(request, w);
spdk_jsonrpc_client_send_request(client, request); spdk_jsonrpc_client_send_request(client, request);
do { rc = _rpc_client_wait_for_response(client);
rc = spdk_jsonrpc_client_poll(client, 1);
} while (rc == 0 || rc == -ENOTCONN);
if (rc <= 0) { if (rc <= 0) {
SPDK_ERRLOG("Failed to get response: %d\n", rc);
rc = -1;
goto out; goto out;
} }
@ -146,6 +157,103 @@ rpc_test_method_runtime(struct spdk_jsonrpc_request *request, const struct spdk_
} }
SPDK_RPC_REGISTER("test_method_runtime", rpc_test_method_runtime, SPDK_RPC_RUNTIME) SPDK_RPC_REGISTER("test_method_runtime", rpc_test_method_runtime, SPDK_RPC_RUNTIME)
static bool g_conn_close_detected;
static void
rpc_test_conn_close_cb(struct spdk_jsonrpc_server_conn *conn, void *ctx)
{
assert((intptr_t)ctx == 42);
g_conn_close_detected = true;
}
static void
rpc_hook_conn_close(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
{
struct spdk_jsonrpc_server_conn *conn = spdk_jsonrpc_get_conn(request);
struct spdk_json_write_ctx *w;
int rc;
rc = spdk_jsonrpc_conn_add_close_cb(conn, rpc_test_conn_close_cb, (void *)(intptr_t)(42));
if (rc != 0) {
rc = spdk_jsonrpc_conn_add_close_cb(conn, rpc_test_conn_close_cb, (void *)(intptr_t)(42));
assert(rc == -ENOSPC);
}
rc = spdk_jsonrpc_conn_add_close_cb(conn, rpc_test_conn_close_cb, (void *)(intptr_t)(42));
if (rc != -EEXIST) {
spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
"rpc_test_method_conn_close_detect(): rc != -EEXIST");
return;
}
rc = spdk_jsonrpc_conn_add_close_cb(conn, rpc_test_conn_close_cb, (void *)(intptr_t)(43));
if (rc != -ENOSPC) {
spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
"rpc_test_method_conn_close_detect(): rc != -ENOSPC");
return;
}
w = spdk_jsonrpc_begin_result(request);
assert(w != NULL);
spdk_json_write_bool(w, true);
spdk_jsonrpc_end_result(request, w);
}
SPDK_RPC_REGISTER("hook_conn_close", rpc_hook_conn_close, SPDK_RPC_RUNTIME | SPDK_RPC_STARTUP)
static int
spdk_jsonrpc_client_hook_conn_close(struct spdk_jsonrpc_client *client)
{
int rc;
bool res = false;
struct spdk_jsonrpc_client_response *json_resp = NULL;
struct spdk_json_write_ctx *w;
struct spdk_jsonrpc_client_request *request;
request = spdk_jsonrpc_client_create_request();
if (request == NULL) {
return -ENOMEM;
}
w = spdk_jsonrpc_begin_request(request, 1, "hook_conn_close");
spdk_jsonrpc_end_request(request, w);
spdk_jsonrpc_client_send_request(client, request);
rc = _rpc_client_wait_for_response(client);
if (rc <= 0) {
goto out;
}
json_resp = spdk_jsonrpc_client_get_response(client);
if (json_resp == NULL) {
SPDK_ERRLOG("spdk_jsonrpc_client_get_response() failed\n");
rc = -errno;
goto out;
}
/* Check for error response */
if (json_resp->error != NULL) {
SPDK_ERRLOG("Unexpected error response: %*s\n", json_resp->error->len,
(char *)json_resp->error->start);
rc = -EIO;
goto out;
}
assert(json_resp->result);
if (spdk_json_decode_bool(json_resp->result, &res) != 0 || res != true) {
SPDK_ERRLOG("Response is not and boolean or if not 'true'\n");
rc = -EINVAL;
goto out;
}
rc = 0;
out:
spdk_jsonrpc_client_free_response(json_resp);
return rc;
}
/* Helper function */ /* Helper function */
static int static int
_sem_timedwait(sem_t *sem, __time_t sec) _sem_timedwait(sem_t *sem, __time_t sec)
@ -199,7 +307,8 @@ rpc_client_th(void *arg)
rc = _sem_timedwait(&g_rpc_server_th_listening, 2); rc = _sem_timedwait(&g_rpc_server_th_listening, 2);
if (rc == -1) { if (rc == -1) {
fprintf(stderr, "Timeout waiting for server thread to start listening: %d\n", errno); fprintf(stderr, "Timeout waiting for server thread to start listening: rc=%d errno=%d\n", rc,
errno);
goto out; goto out;
} }
@ -212,7 +321,13 @@ rpc_client_th(void *arg)
rc = spdk_jsonrpc_client_check_rpc_method(client, method_name); rc = spdk_jsonrpc_client_check_rpc_method(client, method_name);
if (rc) { if (rc) {
fprintf(stderr, "spdk_jsonrpc_client_check_rpc_method() failed: %d\n", errno); fprintf(stderr, "spdk_jsonrpc_client_check_rpc_method() failed: rc=%d errno=%d\n", rc, errno);
goto out;
}
rc = spdk_jsonrpc_client_hook_conn_close(client);
if (rc) {
fprintf(stderr, "spdk_jsonrpc_client_hook_conn_close() failed: rc=%d errno=%d\n", rc, errno);
goto out; goto out;
} }
@ -286,6 +401,11 @@ out:
} }
} }
if (g_conn_close_detected == false) {
fprintf(stderr, "Connection close not detected\n");
err_cnt++;
}
sem_destroy(&g_rpc_server_th_listening); sem_destroy(&g_rpc_server_th_listening);
sem_destroy(&g_rpc_server_th_done); sem_destroy(&g_rpc_server_th_done);
sem_destroy(&g_rpc_client_th_done); sem_destroy(&g_rpc_client_th_done);