Send packets to remote node only via the send thread to avoid possible
races - in this case a keepalive packet was send from wrong thread which lead to connection dropping, because of corrupted packet. Fix it by sending keepalive packets directly from the send thread. As a bonus we now send keepalive packets only when connection is idle. Submitted by: Mikolaj Golub <to.my.trociny@gmail.com> MFC after: 3 days
This commit is contained in:
parent
5b867e813a
commit
448efa9421
@ -180,14 +180,21 @@ static pthread_mutex_t metadata_lock;
|
||||
if (_wakeup) \
|
||||
cv_signal(&hio_##name##_list_cond); \
|
||||
} while (0)
|
||||
#define QUEUE_TAKE1(hio, name, ncomp) do { \
|
||||
#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \
|
||||
bool _last; \
|
||||
\
|
||||
mtx_lock(&hio_##name##_list_lock[(ncomp)]); \
|
||||
while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \
|
||||
cv_wait(&hio_##name##_list_cond[(ncomp)], \
|
||||
&hio_##name##_list_lock[(ncomp)]); \
|
||||
_last = false; \
|
||||
while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \
|
||||
cv_timedwait(&hio_##name##_list_cond[(ncomp)], \
|
||||
&hio_##name##_list_lock[(ncomp)], (timeout)); \
|
||||
if ((timeout) != 0) \
|
||||
_last = true; \
|
||||
} \
|
||||
if (hio != NULL) { \
|
||||
TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \
|
||||
hio_next[(ncomp)]); \
|
||||
} \
|
||||
TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \
|
||||
hio_next[(ncomp)]); \
|
||||
mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \
|
||||
} while (0)
|
||||
#define QUEUE_TAKE2(hio, name) do { \
|
||||
@ -1112,7 +1119,7 @@ local_send_thread(void *arg)
|
||||
|
||||
for (;;) {
|
||||
pjdlog_debug(2, "local_send: Taking request.");
|
||||
QUEUE_TAKE1(hio, send, ncomp);
|
||||
QUEUE_TAKE1(hio, send, ncomp, 0);
|
||||
pjdlog_debug(2, "local_send: (%p) Got request.", hio);
|
||||
ggio = &hio->hio_ggio;
|
||||
switch (ggio->gctl_cmd) {
|
||||
@ -1176,6 +1183,38 @@ local_send_thread(void *arg)
|
||||
return (NULL);
|
||||
}
|
||||
|
||||
static void
|
||||
keepalive_send(struct hast_resource *res, unsigned int ncomp)
|
||||
{
|
||||
struct nv *nv;
|
||||
|
||||
if (!ISCONNECTED(res, ncomp))
|
||||
return;
|
||||
|
||||
assert(res->hr_remotein != NULL);
|
||||
assert(res->hr_remoteout != NULL);
|
||||
|
||||
nv = nv_alloc();
|
||||
nv_add_uint8(nv, HIO_KEEPALIVE, "cmd");
|
||||
if (nv_error(nv) != 0) {
|
||||
nv_free(nv);
|
||||
pjdlog_debug(1,
|
||||
"keepalive_send: Unable to prepare header to send.");
|
||||
return;
|
||||
}
|
||||
if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) {
|
||||
pjdlog_common(LOG_DEBUG, 1, errno,
|
||||
"keepalive_send: Unable to send request");
|
||||
nv_free(nv);
|
||||
rw_unlock(&hio_remote_lock[ncomp]);
|
||||
remote_close(res, ncomp);
|
||||
rw_rlock(&hio_remote_lock[ncomp]);
|
||||
return;
|
||||
}
|
||||
nv_free(nv);
|
||||
pjdlog_debug(2, "keepalive_send: Request sent.");
|
||||
}
|
||||
|
||||
/*
|
||||
* Thread sends request to secondary node.
|
||||
*/
|
||||
@ -1184,6 +1223,7 @@ remote_send_thread(void *arg)
|
||||
{
|
||||
struct hast_resource *res = arg;
|
||||
struct g_gate_ctl_io *ggio;
|
||||
time_t lastcheck, now;
|
||||
struct hio *hio;
|
||||
struct nv *nv;
|
||||
unsigned int ncomp;
|
||||
@ -1194,10 +1234,19 @@ remote_send_thread(void *arg)
|
||||
|
||||
/* Remote component is 1 for now. */
|
||||
ncomp = 1;
|
||||
lastcheck = time(NULL);
|
||||
|
||||
for (;;) {
|
||||
pjdlog_debug(2, "remote_send: Taking request.");
|
||||
QUEUE_TAKE1(hio, send, ncomp);
|
||||
QUEUE_TAKE1(hio, send, ncomp, RETRY_SLEEP);
|
||||
if (hio == NULL) {
|
||||
now = time(NULL);
|
||||
if (lastcheck + RETRY_SLEEP <= now) {
|
||||
keepalive_send(res, ncomp);
|
||||
lastcheck = now;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
pjdlog_debug(2, "remote_send: (%p) Got request.", hio);
|
||||
ggio = &hio->hio_ggio;
|
||||
switch (ggio->gctl_cmd) {
|
||||
@ -1882,32 +1931,6 @@ failed:
|
||||
pjdlog_warning("Configuration not reloaded.");
|
||||
}
|
||||
|
||||
static void
|
||||
keepalive_send(struct hast_resource *res, unsigned int ncomp)
|
||||
{
|
||||
struct nv *nv;
|
||||
|
||||
nv = nv_alloc();
|
||||
nv_add_uint8(nv, HIO_KEEPALIVE, "cmd");
|
||||
if (nv_error(nv) != 0) {
|
||||
nv_free(nv);
|
||||
pjdlog_debug(1,
|
||||
"keepalive_send: Unable to prepare header to send.");
|
||||
return;
|
||||
}
|
||||
if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) {
|
||||
pjdlog_common(LOG_DEBUG, 1, errno,
|
||||
"keepalive_send: Unable to send request");
|
||||
nv_free(nv);
|
||||
rw_unlock(&hio_remote_lock[ncomp]);
|
||||
remote_close(res, ncomp);
|
||||
rw_rlock(&hio_remote_lock[ncomp]);
|
||||
return;
|
||||
}
|
||||
nv_free(nv);
|
||||
pjdlog_debug(2, "keepalive_send: Request sent.");
|
||||
}
|
||||
|
||||
static void
|
||||
guard_one(struct hast_resource *res, unsigned int ncomp)
|
||||
{
|
||||
@ -1923,12 +1946,6 @@ guard_one(struct hast_resource *res, unsigned int ncomp)
|
||||
return;
|
||||
}
|
||||
|
||||
if (ISCONNECTED(res, ncomp)) {
|
||||
assert(res->hr_remotein != NULL);
|
||||
assert(res->hr_remoteout != NULL);
|
||||
keepalive_send(res, ncomp);
|
||||
}
|
||||
|
||||
if (ISCONNECTED(res, ncomp)) {
|
||||
assert(res->hr_remotein != NULL);
|
||||
assert(res->hr_remoteout != NULL);
|
||||
|
Loading…
x
Reference in New Issue
Block a user