Don't hold connection lock when doing reconnects as it makes I/Os wait for

connection timeouts.

Reported by:	Kevin Day <toasty@dragondata.com>
This commit is contained in:
Pawel Jakub Dawidek 2010-03-27 16:35:07 +00:00
parent 42d008a11c
commit 0d9014f354
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=205738

View File

@ -460,9 +460,11 @@ init_local(struct hast_resource *res)
exit(EX_NOINPUT); exit(EX_NOINPUT);
} }
static void static bool
init_remote(struct hast_resource *res) init_remote(struct hast_resource *res, struct proto_conn **inp,
struct proto_conn **outp)
{ {
struct proto_conn *in, *out;
struct nv *nvout, *nvin; struct nv *nvout, *nvin;
const unsigned char *token; const unsigned char *token;
unsigned char *map; unsigned char *map;
@ -472,13 +474,17 @@ init_remote(struct hast_resource *res)
uint32_t mapsize; uint32_t mapsize;
size_t size; size_t size;
assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL));
in = out = NULL;
/* Prepare outgoing connection with remote node. */ /* Prepare outgoing connection with remote node. */
if (proto_client(res->hr_remoteaddr, &res->hr_remoteout) < 0) { if (proto_client(res->hr_remoteaddr, &out) < 0) {
primary_exit(EX_OSERR, "Unable to create connection to %s", primary_exit(EX_OSERR, "Unable to create connection to %s",
res->hr_remoteaddr); res->hr_remoteaddr);
} }
/* Try to connect, but accept failure. */ /* Try to connect, but accept failure. */
if (proto_connect(res->hr_remoteout) < 0) { if (proto_connect(out) < 0) {
pjdlog_errno(LOG_WARNING, "Unable to connect to %s", pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
res->hr_remoteaddr); res->hr_remoteaddr);
goto close; goto close;
@ -496,7 +502,7 @@ init_remote(struct hast_resource *res)
nv_free(nvout); nv_free(nvout);
goto close; goto close;
} }
if (hast_proto_send(res, res->hr_remoteout, nvout, NULL, 0) < 0) { if (hast_proto_send(res, out, nvout, NULL, 0) < 0) {
pjdlog_errno(LOG_WARNING, pjdlog_errno(LOG_WARNING,
"Unable to send handshake header to %s", "Unable to send handshake header to %s",
res->hr_remoteaddr); res->hr_remoteaddr);
@ -504,7 +510,7 @@ init_remote(struct hast_resource *res)
goto close; goto close;
} }
nv_free(nvout); nv_free(nvout);
if (hast_proto_recv_hdr(res->hr_remoteout, &nvin) < 0) { if (hast_proto_recv_hdr(out, &nvin) < 0) {
pjdlog_errno(LOG_WARNING, pjdlog_errno(LOG_WARNING,
"Unable to receive handshake header from %s", "Unable to receive handshake header from %s",
res->hr_remoteaddr); res->hr_remoteaddr);
@ -536,12 +542,12 @@ init_remote(struct hast_resource *res)
* Second handshake step. * Second handshake step.
* Setup incoming connection with remote node. * Setup incoming connection with remote node.
*/ */
if (proto_client(res->hr_remoteaddr, &res->hr_remotein) < 0) { if (proto_client(res->hr_remoteaddr, &in) < 0) {
pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", pjdlog_errno(LOG_WARNING, "Unable to create connection to %s",
res->hr_remoteaddr); res->hr_remoteaddr);
} }
/* Try to connect, but accept failure. */ /* Try to connect, but accept failure. */
if (proto_connect(res->hr_remotein) < 0) { if (proto_connect(in) < 0) {
pjdlog_errno(LOG_WARNING, "Unable to connect to %s", pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
res->hr_remoteaddr); res->hr_remoteaddr);
goto close; goto close;
@ -560,7 +566,7 @@ init_remote(struct hast_resource *res)
nv_free(nvout); nv_free(nvout);
goto close; goto close;
} }
if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) < 0) { if (hast_proto_send(res, in, nvout, NULL, 0) < 0) {
pjdlog_errno(LOG_WARNING, pjdlog_errno(LOG_WARNING,
"Unable to send handshake header to %s", "Unable to send handshake header to %s",
res->hr_remoteaddr); res->hr_remoteaddr);
@ -568,7 +574,7 @@ init_remote(struct hast_resource *res)
goto close; goto close;
} }
nv_free(nvout); nv_free(nvout);
if (hast_proto_recv_hdr(res->hr_remoteout, &nvin) < 0) { if (hast_proto_recv_hdr(out, &nvin) < 0) {
pjdlog_errno(LOG_WARNING, pjdlog_errno(LOG_WARNING,
"Unable to receive handshake header from %s", "Unable to receive handshake header from %s",
res->hr_remoteaddr); res->hr_remoteaddr);
@ -611,7 +617,7 @@ init_remote(struct hast_resource *res)
* Remote node have some dirty extents on its own, lets * Remote node have some dirty extents on its own, lets
* download its activemap. * download its activemap.
*/ */
if (hast_proto_recv_data(res, res->hr_remoteout, nvin, map, if (hast_proto_recv_data(res, out, nvin, map,
mapsize) < 0) { mapsize) < 0) {
pjdlog_errno(LOG_ERR, pjdlog_errno(LOG_ERR,
"Unable to receive remote activemap"); "Unable to receive remote activemap");
@ -631,18 +637,29 @@ init_remote(struct hast_resource *res)
(void)hast_activemap_flush(res); (void)hast_activemap_flush(res);
} }
pjdlog_info("Connected to %s.", res->hr_remoteaddr); pjdlog_info("Connected to %s.", res->hr_remoteaddr);
if (inp != NULL && outp != NULL) {
*inp = in;
*outp = out;
} else {
res->hr_remotein = in;
res->hr_remoteout = out;
}
return (true);
close:
proto_close(out);
if (in != NULL)
proto_close(in);
return (false);
}
static void
sync_start(void)
{
mtx_lock(&sync_lock); mtx_lock(&sync_lock);
sync_inprogress = true; sync_inprogress = true;
mtx_unlock(&sync_lock); mtx_unlock(&sync_lock);
cv_signal(&sync_cond); cv_signal(&sync_cond);
return;
close:
proto_close(res->hr_remoteout);
res->hr_remoteout = NULL;
if (res->hr_remotein != NULL) {
proto_close(res->hr_remotein);
res->hr_remotein = NULL;
}
} }
static void static void
@ -735,7 +752,8 @@ hastd_primary(struct hast_resource *res)
setproctitle("%s (primary)", res->hr_name); setproctitle("%s (primary)", res->hr_name);
init_local(res); init_local(res);
init_remote(res); if (init_remote(res, NULL, NULL))
sync_start();
init_ggate(res); init_ggate(res);
init_environment(res); init_environment(res);
error = pthread_create(&td, NULL, ggate_recv_thread, res); error = pthread_create(&td, NULL, ggate_recv_thread, res);
@ -1695,6 +1713,7 @@ static void *
guard_thread(void *arg) guard_thread(void *arg)
{ {
struct hast_resource *res = arg; struct hast_resource *res = arg;
struct proto_conn *in, *out;
unsigned int ii, ncomps; unsigned int ii, ncomps;
int timeout; int timeout;
@ -1738,26 +1757,31 @@ guard_thread(void *arg)
* connected. * connected.
*/ */
rw_unlock(&hio_remote_lock[ii]); rw_unlock(&hio_remote_lock[ii]);
rw_wlock(&hio_remote_lock[ii]);
assert(res->hr_remotein == NULL);
assert(res->hr_remoteout == NULL);
pjdlog_debug(2, pjdlog_debug(2,
"remote_guard: Reconnecting to %s.", "remote_guard: Reconnecting to %s.",
res->hr_remoteaddr); res->hr_remoteaddr);
init_remote(res); in = out = NULL;
if (ISCONNECTED(res, ii)) { if (init_remote(res, &in, &out)) {
rw_wlock(&hio_remote_lock[ii]);
assert(res->hr_remotein == NULL);
assert(res->hr_remoteout == NULL);
assert(in != NULL && out != NULL);
res->hr_remotein = in;
res->hr_remoteout = out;
rw_unlock(&hio_remote_lock[ii]);
pjdlog_info("Successfully reconnected to %s.", pjdlog_info("Successfully reconnected to %s.",
res->hr_remoteaddr); res->hr_remoteaddr);
sync_start();
} else { } else {
/* Both connections should be NULL. */ /* Both connections should be NULL. */
assert(res->hr_remotein == NULL); assert(res->hr_remotein == NULL);
assert(res->hr_remoteout == NULL); assert(res->hr_remoteout == NULL);
assert(in == NULL && out == NULL);
pjdlog_debug(2, pjdlog_debug(2,
"remote_guard: Reconnect to %s failed.", "remote_guard: Reconnect to %s failed.",
res->hr_remoteaddr); res->hr_remoteaddr);
timeout = RECONNECT_SLEEP; timeout = RECONNECT_SLEEP;
} }
rw_unlock(&hio_remote_lock[ii]);
} }
} }
(void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout); (void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout);