Implement 'async' mode for HAST.

MFC after:	3 days
This commit is contained in:
Pawel Jakub Dawidek 2011-10-27 20:32:57 +00:00
parent 3f5bce1822
commit 07ebc3626e
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=226859
3 changed files with 81 additions and 55 deletions

View File

@ -28,7 +28,7 @@
.\" .\"
.\" $FreeBSD$ .\" $FreeBSD$
.\" .\"
.Dd May 20, 2011 .Dd October 27, 2011
.Dt HAST.CONF 5 .Dt HAST.CONF 5
.Os .Os
.Sh NAME .Sh NAME
@ -224,9 +224,6 @@ completes.
This is the fastest and the most dangerous replication mode. This is the fastest and the most dangerous replication mode.
This mode should be used when replicating to a distant node where This mode should be used when replicating to a distant node where
latency is too high for other modes. latency is too high for other modes.
The
.Ic async
replication mode is currently not implemented.
.El .El
.It Ic checksum Aq algorithm .It Ic checksum Aq algorithm
.Pp .Pp

View File

@ -301,11 +301,9 @@ yy_config_parse(const char *config, bool exitonerror)
*/ */
curres->hr_replication = depth0_replication; curres->hr_replication = depth0_replication;
} }
if (curres->hr_replication == HAST_REPLICATION_MEMSYNC || if (curres->hr_replication == HAST_REPLICATION_MEMSYNC) {
curres->hr_replication == HAST_REPLICATION_ASYNC) {
pjdlog_warning("Replication mode \"%s\" is not implemented, falling back to \"%s\".", pjdlog_warning("Replication mode \"%s\" is not implemented, falling back to \"%s\".",
curres->hr_replication == HAST_REPLICATION_MEMSYNC ? "memsync", "fullsync");
"memsync" : "async", "fullsync");
curres->hr_replication = HAST_REPLICATION_FULLSYNC; curres->hr_replication = HAST_REPLICATION_FULLSYNC;
} }
if (curres->hr_checksum == -1) { if (curres->hr_checksum == -1) {

View File

@ -89,6 +89,15 @@ struct hio {
* Structure used to communicate with GEOM Gate class. * Structure used to communicate with GEOM Gate class.
*/ */
struct g_gate_ctl_io hio_ggio; struct g_gate_ctl_io hio_ggio;
/*
* Request was already confirmed to GEOM Gate.
*/
bool hio_done;
/*
* Remember replication from the time the request was initiated,
* so we won't get confused when replication changes on reload.
*/
int hio_replication;
TAILQ_ENTRY(hio) *hio_next; TAILQ_ENTRY(hio) *hio_next;
}; };
#define hio_free_next hio_next[0] #define hio_free_next hio_next[0]
@ -1055,6 +1064,42 @@ remote_close(struct hast_resource *res, int ncomp)
event_send(res, EVENT_DISCONNECT); event_send(res, EVENT_DISCONNECT);
} }
/*
* Acknowledge write completion to the kernel, but don't update activemap yet.
*/
static void
write_complete(struct hast_resource *res, struct hio *hio)
{
struct g_gate_ctl_io *ggio;
unsigned int ncomp;
PJDLOG_ASSERT(!hio->hio_done);
ggio = &hio->hio_ggio;
PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE);
/*
* Bump local count if this is first write after
* connection failure with remote node.
*/
ncomp = 1;
rw_rlock(&hio_remote_lock[ncomp]);
if (!ISCONNECTED(res, ncomp)) {
mtx_lock(&metadata_lock);
if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) {
res->hr_primary_localcnt++;
pjdlog_debug(1, "Increasing localcnt to %ju.",
(uintmax_t)res->hr_primary_localcnt);
(void)metadata_write(res);
}
mtx_unlock(&metadata_lock);
}
rw_unlock(&hio_remote_lock[ncomp]);
if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
hio->hio_done = true;
}
/* /*
* Thread receives ggate I/O requests from the kernel and passes them to * Thread receives ggate I/O requests from the kernel and passes them to
* appropriate threads: * appropriate threads:
@ -1075,8 +1120,6 @@ ggate_recv_thread(void *arg)
unsigned int ii, ncomp, ncomps; unsigned int ii, ncomp, ncomps;
int error; int error;
ncomps = HAST_NCOMPONENTS;
for (;;) { for (;;) {
pjdlog_debug(2, "ggate_recv: Taking free request."); pjdlog_debug(2, "ggate_recv: Taking free request.");
QUEUE_TAKE2(hio, free); QUEUE_TAKE2(hio, free);
@ -1085,6 +1128,8 @@ ggate_recv_thread(void *arg)
ggio->gctl_unit = res->hr_ggateunit; ggio->gctl_unit = res->hr_ggateunit;
ggio->gctl_length = MAXPHYS; ggio->gctl_length = MAXPHYS;
ggio->gctl_error = 0; ggio->gctl_error = 0;
hio->hio_done = false;
hio->hio_replication = res->hr_replication;
pjdlog_debug(2, pjdlog_debug(2,
"ggate_recv: (%p) Waiting for request from the kernel.", "ggate_recv: (%p) Waiting for request from the kernel.",
hio); hio);
@ -1117,11 +1162,16 @@ ggate_recv_thread(void *arg)
primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
strerror(error)); strerror(error));
} }
ncomp = 0;
ncomps = HAST_NCOMPONENTS;
for (ii = 0; ii < ncomps; ii++) for (ii = 0; ii < ncomps; ii++)
hio->hio_errors[ii] = EINVAL; hio->hio_errors[ii] = EINVAL;
reqlog(LOG_DEBUG, 2, ggio, reqlog(LOG_DEBUG, 2, ggio,
"ggate_recv: (%p) Request received from the kernel: ", "ggate_recv: (%p) Request received from the kernel: ",
hio); hio);
/* /*
* Inform all components about new write request. * Inform all components about new write request.
* For read request prefer local component unless the given * For read request prefer local component unless the given
@ -1130,10 +1180,7 @@ ggate_recv_thread(void *arg)
switch (ggio->gctl_cmd) { switch (ggio->gctl_cmd) {
case BIO_READ: case BIO_READ:
res->hr_stat_read++; res->hr_stat_read++;
pjdlog_debug(2, ncomps = 1;
"ggate_recv: (%p) Moving request to the send queue.",
hio);
refcount_init(&hio->hio_countdown, 1);
mtx_lock(&metadata_lock); mtx_lock(&metadata_lock);
if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
@ -1155,7 +1202,6 @@ ggate_recv_thread(void *arg)
ncomp = 1; ncomp = 1;
} }
mtx_unlock(&metadata_lock); mtx_unlock(&metadata_lock);
QUEUE_INSERT1(hio, send, ncomp);
break; break;
case BIO_WRITE: case BIO_WRITE:
res->hr_stat_write++; res->hr_stat_write++;
@ -1198,25 +1244,19 @@ ggate_recv_thread(void *arg)
(void)hast_activemap_flush(res); (void)hast_activemap_flush(res);
} }
mtx_unlock(&res->hr_amp_lock); mtx_unlock(&res->hr_amp_lock);
/* FALLTHROUGH */ break;
case BIO_DELETE: case BIO_DELETE:
res->hr_stat_delete++;
break;
case BIO_FLUSH: case BIO_FLUSH:
switch (ggio->gctl_cmd) { res->hr_stat_flush++;
case BIO_DELETE:
res->hr_stat_delete++;
break;
case BIO_FLUSH:
res->hr_stat_flush++;
break;
}
pjdlog_debug(2,
"ggate_recv: (%p) Moving request to the send queue.",
hio);
refcount_init(&hio->hio_countdown, ncomps);
for (ii = 0; ii < ncomps; ii++)
QUEUE_INSERT1(hio, send, ii);
break; break;
} }
pjdlog_debug(2,
"ggate_recv: (%p) Moving request to the send queues.", hio);
refcount_init(&hio->hio_countdown, ncomps);
for (ii = ncomp; ii < ncomps; ii++)
QUEUE_INSERT1(hio, send, ii);
} }
/* NOTREACHED */ /* NOTREACHED */
return (NULL); return (NULL);
@ -1285,6 +1325,11 @@ local_send_thread(void *arg)
ret, (intmax_t)ggio->gctl_length); ret, (intmax_t)ggio->gctl_length);
} else { } else {
hio->hio_errors[ncomp] = 0; hio->hio_errors[ncomp] = 0;
if (hio->hio_replication ==
HAST_REPLICATION_ASYNC) {
ggio->gctl_error = 0;
write_complete(res, hio);
}
} }
break; break;
case BIO_DELETE: case BIO_DELETE:
@ -1668,7 +1713,7 @@ ggate_send_thread(void *arg)
struct hast_resource *res = arg; struct hast_resource *res = arg;
struct g_gate_ctl_io *ggio; struct g_gate_ctl_io *ggio;
struct hio *hio; struct hio *hio;
unsigned int ii, ncomp, ncomps; unsigned int ii, ncomps;
ncomps = HAST_NCOMPONENTS; ncomps = HAST_NCOMPONENTS;
@ -1718,28 +1763,14 @@ ggate_send_thread(void *arg)
if (range_sync_wait) if (range_sync_wait)
cv_signal(&range_sync_cond); cv_signal(&range_sync_cond);
mtx_unlock(&range_lock); mtx_unlock(&range_lock);
/* if (!hio->hio_done)
* Bump local count if this is first write after write_complete(res, hio);
* connection failure with remote node. } else {
*/ if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) {
ncomp = 1; primary_exit(EX_OSERR,
rw_rlock(&hio_remote_lock[ncomp]); "G_GATE_CMD_DONE failed");
if (!ISCONNECTED(res, ncomp)) {
mtx_lock(&metadata_lock);
if (res->hr_primary_localcnt ==
res->hr_secondary_remotecnt) {
res->hr_primary_localcnt++;
pjdlog_debug(1,
"Increasing localcnt to %ju.",
(uintmax_t)res->hr_primary_localcnt);
(void)metadata_write(res);
}
mtx_unlock(&metadata_lock);
} }
rw_unlock(&hio_remote_lock[ncomp]);
} }
if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
pjdlog_debug(2, pjdlog_debug(2,
"ggate_send: (%p) Moving request to the free queue.", hio); "ggate_send: (%p) Moving request to the free queue.", hio);
QUEUE_INSERT2(hio, free); QUEUE_INSERT2(hio, free);
@ -1892,6 +1923,8 @@ sync_thread(void *arg __unused)
ggio->gctl_offset = offset; ggio->gctl_offset = offset;
ggio->gctl_length = length; ggio->gctl_length = length;
ggio->gctl_error = 0; ggio->gctl_error = 0;
hio->hio_done = false;
hio->hio_replication = res->hr_replication;
for (ii = 0; ii < ncomps; ii++) for (ii = 0; ii < ncomps; ii++)
hio->hio_errors[ii] = EINVAL; hio->hio_errors[ii] = EINVAL;
reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
@ -2080,8 +2113,7 @@ primary_config_reload(struct hast_resource *res, struct nv *nv)
* Don't bother if we need to reconnect. * Don't bother if we need to reconnect.
*/ */
if ((modified & MODIFIED_TIMEOUT) != 0 && if ((modified & MODIFIED_TIMEOUT) != 0 &&
(modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) {
MODIFIED_REPLICATION)) == 0) {
for (ii = 0; ii < ncomps; ii++) { for (ii = 0; ii < ncomps; ii++) {
if (!ISREMOTE(ii)) if (!ISREMOTE(ii))
continue; continue;
@ -2103,8 +2135,7 @@ primary_config_reload(struct hast_resource *res, struct nv *nv)
} }
} }
} }
if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) {
MODIFIED_REPLICATION)) != 0) {
for (ii = 0; ii < ncomps; ii++) { for (ii = 0; ii < ncomps; ii++) {
if (!ISREMOTE(ii)) if (!ISREMOTE(ii))
continue; continue;