Implement keepalive mechanism inside HAST protocol so we can detect secondary
node failures quickly for HAST resources that are rarely modified. Remove XXX from a comment now that the guard thread never sleeps infinitely. MFC after: 2 weeks Obtained from: Wheel Systems Sp. z o.o. http://www.wheelsystems.com
This commit is contained in:
parent
8f8c798c13
commit
f7fe83f9f8
Notes:
svn2git
2020-12-20 02:59:44 +00:00
svn path=/head/; revision=211882
@ -48,7 +48,12 @@
|
||||
|
||||
#include "proto.h"
|
||||
|
||||
#define HAST_PROTO_VERSION 0
|
||||
/*
|
||||
* Version history:
|
||||
* 0 - initial version
|
||||
* 1 - HIO_KEEPALIVE added
|
||||
*/
|
||||
#define HAST_PROTO_VERSION 1
|
||||
|
||||
#define EHAST_OK 0
|
||||
#define EHAST_NOENTRY 1
|
||||
@ -74,6 +79,7 @@
|
||||
#define HIO_WRITE 2
|
||||
#define HIO_DELETE 3
|
||||
#define HIO_FLUSH 4
|
||||
#define HIO_KEEPALIVE 5
|
||||
|
||||
#define HAST_TIMEOUT 5
|
||||
#define HAST_CONFIG "/etc/hast.conf"
|
||||
|
@ -151,7 +151,11 @@ static pthread_mutex_t metadata_lock;
|
||||
*/
|
||||
#define HAST_NCOMPONENTS 2
|
||||
/*
|
||||
* Number of seconds to sleep before next reconnect try.
|
||||
* Number of seconds to sleep between keepalive packets.
|
||||
*/
|
||||
#define KEEPALIVE_SLEEP 10
|
||||
/*
|
||||
* Number of seconds to sleep between reconnect retries.
|
||||
*/
|
||||
#define RECONNECT_SLEEP 5
|
||||
|
||||
@ -886,12 +890,15 @@ remote_close(struct hast_resource *res, int ncomp)
|
||||
sync_stop();
|
||||
|
||||
/*
|
||||
* Wake up guard thread, so it can immediately start reconnect.
|
||||
* Wake up guard thread (if we are not called from within guard thread),
|
||||
* so it can immediately start reconnect.
|
||||
*/
|
||||
if (!mtx_owned(&hio_guard_lock)) {
|
||||
mtx_lock(&hio_guard_lock);
|
||||
cv_signal(&hio_guard_cond);
|
||||
mtx_unlock(&hio_guard_lock);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Thread receives ggate I/O requests from the kernel and passes them to
|
||||
@ -1734,7 +1741,7 @@ sighandler(int sig)
|
||||
assert(!"invalid condition");
|
||||
}
|
||||
/*
|
||||
* XXX: Racy, but if we cannot obtain hio_guard_lock here, we don't
|
||||
* Racy, but if we cannot obtain hio_guard_lock here, we don't
|
||||
* want to risk deadlock.
|
||||
*/
|
||||
unlock = mtx_trylock(&hio_guard_lock);
|
||||
@ -1851,6 +1858,32 @@ config_reload(void)
|
||||
pjdlog_warning("Configuration not reloaded.");
|
||||
}
|
||||
|
||||
static void
|
||||
keepalive_send(struct hast_resource *res, unsigned int ncomp)
|
||||
{
|
||||
struct nv *nv;
|
||||
|
||||
nv = nv_alloc();
|
||||
nv_add_uint8(nv, HIO_KEEPALIVE, "cmd");
|
||||
if (nv_error(nv) != 0) {
|
||||
nv_free(nv);
|
||||
pjdlog_debug(1,
|
||||
"keepalive_send: Unable to prepare header to send.");
|
||||
return;
|
||||
}
|
||||
if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) {
|
||||
pjdlog_common(LOG_DEBUG, 1, errno,
|
||||
"keepalive_send: Unable to send request");
|
||||
nv_free(nv);
|
||||
rw_unlock(&hio_remote_lock[ncomp]);
|
||||
remote_close(res, ncomp);
|
||||
rw_rlock(&hio_remote_lock[ncomp]);
|
||||
return;
|
||||
}
|
||||
nv_free(nv);
|
||||
pjdlog_debug(2, "keepalive_send: Request sent.");
|
||||
}
|
||||
|
||||
/*
|
||||
* Thread guards remote connections and reconnects when needed, handles
|
||||
* signals, etc.
|
||||
@ -1874,20 +1907,19 @@ guard_thread(void *arg)
|
||||
sighup_received = false;
|
||||
config_reload();
|
||||
}
|
||||
/*
|
||||
* If all the connection will be fine, we will sleep until
|
||||
* someone wakes us up.
|
||||
* If any of the connections will be broken and we won't be
|
||||
* able to connect, we will sleep only for RECONNECT_SLEEP
|
||||
* seconds so we can retry soon.
|
||||
*/
|
||||
timeout = 0;
|
||||
|
||||
timeout = KEEPALIVE_SLEEP;
|
||||
pjdlog_debug(2, "remote_guard: Checking connections.");
|
||||
mtx_lock(&hio_guard_lock);
|
||||
for (ii = 0; ii < ncomps; ii++) {
|
||||
if (!ISREMOTE(ii))
|
||||
continue;
|
||||
rw_rlock(&hio_remote_lock[ii]);
|
||||
if (ISCONNECTED(res, ii)) {
|
||||
assert(res->hr_remotein != NULL);
|
||||
assert(res->hr_remoteout != NULL);
|
||||
keepalive_send(res, ii);
|
||||
}
|
||||
if (ISCONNECTED(res, ii)) {
|
||||
assert(res->hr_remotein != NULL);
|
||||
assert(res->hr_remoteout != NULL);
|
||||
|
@ -413,6 +413,9 @@ reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt
|
||||
"WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
|
||||
(uintmax_t)hio->hio_length);
|
||||
break;
|
||||
case HIO_KEEPALIVE:
|
||||
(void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
|
||||
break;
|
||||
default:
|
||||
(void)snprintf(msg + len, sizeof(msg) - len,
|
||||
"UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
|
||||
@ -433,6 +436,8 @@ requnpack(struct hast_resource *res, struct hio *hio)
|
||||
goto end;
|
||||
}
|
||||
switch (hio->hio_cmd) {
|
||||
case HIO_KEEPALIVE:
|
||||
break;
|
||||
case HIO_READ:
|
||||
case HIO_WRITE:
|
||||
case HIO_DELETE:
|
||||
@ -517,7 +522,14 @@ recv_thread(void *arg)
|
||||
}
|
||||
reqlog(LOG_DEBUG, 2, -1, hio,
|
||||
"recv: (%p) Got request header: ", hio);
|
||||
if (hio->hio_cmd == HIO_WRITE) {
|
||||
if (hio->hio_cmd == HIO_KEEPALIVE) {
|
||||
pjdlog_debug(2,
|
||||
"recv: (%p) Moving request to the free queue.",
|
||||
hio);
|
||||
nv_free(hio->hio_nv);
|
||||
QUEUE_INSERT(free, hio);
|
||||
continue;
|
||||
} else if (hio->hio_cmd == HIO_WRITE) {
|
||||
if (hast_proto_recv_data(res, res->hr_remotein,
|
||||
hio->hio_nv, hio->hio_data, MAXPHYS) < 0) {
|
||||
pjdlog_exit(EX_TEMPFAIL,
|
||||
|
Loading…
Reference in New Issue
Block a user