Implement 'async' mode for HAST.

MFC after:	3 days
This commit is contained in:
pjd 2011-10-27 20:32:57 +00:00
parent c017e98c55
commit 14cf798458
3 changed files with 81 additions and 55 deletions

View File

@ -28,7 +28,7 @@
.\"
.\" $FreeBSD$
.\"
.Dd May 20, 2011
.Dd October 27, 2011
.Dt HAST.CONF 5
.Os
.Sh NAME
@ -224,9 +224,6 @@ completes.
This is the fastest and the most dangerous replication mode.
This mode should be used when replicating to a distant node where
latency is too high for other modes.
The
.Ic async
replication mode is currently not implemented.
.El
.It Ic checksum Aq algorithm
.Pp

View File

@ -301,11 +301,9 @@ yy_config_parse(const char *config, bool exitonerror)
*/
curres->hr_replication = depth0_replication;
}
if (curres->hr_replication == HAST_REPLICATION_MEMSYNC ||
curres->hr_replication == HAST_REPLICATION_ASYNC) {
if (curres->hr_replication == HAST_REPLICATION_MEMSYNC) {
pjdlog_warning("Replication mode \"%s\" is not implemented, falling back to \"%s\".",
curres->hr_replication == HAST_REPLICATION_MEMSYNC ?
"memsync" : "async", "fullsync");
"memsync", "fullsync");
curres->hr_replication = HAST_REPLICATION_FULLSYNC;
}
if (curres->hr_checksum == -1) {

View File

@ -89,6 +89,15 @@ struct hio {
* Structure used to communicate with GEOM Gate class.
*/
struct g_gate_ctl_io hio_ggio;
/*
* Request was already confirmed to GEOM Gate.
*/
bool hio_done;
/*
* Remember replication from the time the request was initiated,
* so we won't get confused when replication changes on reload.
*/
int hio_replication;
TAILQ_ENTRY(hio) *hio_next;
};
#define hio_free_next hio_next[0]
@ -1055,6 +1064,42 @@ remote_close(struct hast_resource *res, int ncomp)
event_send(res, EVENT_DISCONNECT);
}
/*
* Acknowledge write completion to the kernel, but don't update activemap yet.
*/
static void
write_complete(struct hast_resource *res, struct hio *hio)
{
struct g_gate_ctl_io *ggio;
unsigned int ncomp;
PJDLOG_ASSERT(!hio->hio_done);
ggio = &hio->hio_ggio;
PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE);
/*
* Bump local count if this is first write after
* connection failure with remote node.
*/
ncomp = 1;
rw_rlock(&hio_remote_lock[ncomp]);
if (!ISCONNECTED(res, ncomp)) {
mtx_lock(&metadata_lock);
if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) {
res->hr_primary_localcnt++;
pjdlog_debug(1, "Increasing localcnt to %ju.",
(uintmax_t)res->hr_primary_localcnt);
(void)metadata_write(res);
}
mtx_unlock(&metadata_lock);
}
rw_unlock(&hio_remote_lock[ncomp]);
if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
hio->hio_done = true;
}
/*
* Thread receives ggate I/O requests from the kernel and passes them to
* appropriate threads:
@ -1075,8 +1120,6 @@ ggate_recv_thread(void *arg)
unsigned int ii, ncomp, ncomps;
int error;
ncomps = HAST_NCOMPONENTS;
for (;;) {
pjdlog_debug(2, "ggate_recv: Taking free request.");
QUEUE_TAKE2(hio, free);
@ -1085,6 +1128,8 @@ ggate_recv_thread(void *arg)
ggio->gctl_unit = res->hr_ggateunit;
ggio->gctl_length = MAXPHYS;
ggio->gctl_error = 0;
hio->hio_done = false;
hio->hio_replication = res->hr_replication;
pjdlog_debug(2,
"ggate_recv: (%p) Waiting for request from the kernel.",
hio);
@ -1117,11 +1162,16 @@ ggate_recv_thread(void *arg)
primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
strerror(error));
}
ncomp = 0;
ncomps = HAST_NCOMPONENTS;
for (ii = 0; ii < ncomps; ii++)
hio->hio_errors[ii] = EINVAL;
reqlog(LOG_DEBUG, 2, ggio,
"ggate_recv: (%p) Request received from the kernel: ",
hio);
/*
* Inform all components about new write request.
* For read request prefer local component unless the given
@ -1130,10 +1180,7 @@ ggate_recv_thread(void *arg)
switch (ggio->gctl_cmd) {
case BIO_READ:
res->hr_stat_read++;
pjdlog_debug(2,
"ggate_recv: (%p) Moving request to the send queue.",
hio);
refcount_init(&hio->hio_countdown, 1);
ncomps = 1;
mtx_lock(&metadata_lock);
if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
@ -1155,7 +1202,6 @@ ggate_recv_thread(void *arg)
ncomp = 1;
}
mtx_unlock(&metadata_lock);
QUEUE_INSERT1(hio, send, ncomp);
break;
case BIO_WRITE:
res->hr_stat_write++;
@ -1198,25 +1244,19 @@ ggate_recv_thread(void *arg)
(void)hast_activemap_flush(res);
}
mtx_unlock(&res->hr_amp_lock);
/* FALLTHROUGH */
break;
case BIO_DELETE:
res->hr_stat_delete++;
break;
case BIO_FLUSH:
switch (ggio->gctl_cmd) {
case BIO_DELETE:
res->hr_stat_delete++;
break;
case BIO_FLUSH:
res->hr_stat_flush++;
break;
}
pjdlog_debug(2,
"ggate_recv: (%p) Moving request to the send queue.",
hio);
refcount_init(&hio->hio_countdown, ncomps);
for (ii = 0; ii < ncomps; ii++)
QUEUE_INSERT1(hio, send, ii);
res->hr_stat_flush++;
break;
}
pjdlog_debug(2,
"ggate_recv: (%p) Moving request to the send queues.", hio);
refcount_init(&hio->hio_countdown, ncomps);
for (ii = ncomp; ii < ncomps; ii++)
QUEUE_INSERT1(hio, send, ii);
}
/* NOTREACHED */
return (NULL);
@ -1285,6 +1325,11 @@ local_send_thread(void *arg)
ret, (intmax_t)ggio->gctl_length);
} else {
hio->hio_errors[ncomp] = 0;
if (hio->hio_replication ==
HAST_REPLICATION_ASYNC) {
ggio->gctl_error = 0;
write_complete(res, hio);
}
}
break;
case BIO_DELETE:
@ -1668,7 +1713,7 @@ ggate_send_thread(void *arg)
struct hast_resource *res = arg;
struct g_gate_ctl_io *ggio;
struct hio *hio;
unsigned int ii, ncomp, ncomps;
unsigned int ii, ncomps;
ncomps = HAST_NCOMPONENTS;
@ -1718,28 +1763,14 @@ ggate_send_thread(void *arg)
if (range_sync_wait)
cv_signal(&range_sync_cond);
mtx_unlock(&range_lock);
/*
* Bump local count if this is first write after
* connection failure with remote node.
*/
ncomp = 1;
rw_rlock(&hio_remote_lock[ncomp]);
if (!ISCONNECTED(res, ncomp)) {
mtx_lock(&metadata_lock);
if (res->hr_primary_localcnt ==
res->hr_secondary_remotecnt) {
res->hr_primary_localcnt++;
pjdlog_debug(1,
"Increasing localcnt to %ju.",
(uintmax_t)res->hr_primary_localcnt);
(void)metadata_write(res);
}
mtx_unlock(&metadata_lock);
if (!hio->hio_done)
write_complete(res, hio);
} else {
if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) {
primary_exit(EX_OSERR,
"G_GATE_CMD_DONE failed");
}
rw_unlock(&hio_remote_lock[ncomp]);
}
if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
pjdlog_debug(2,
"ggate_send: (%p) Moving request to the free queue.", hio);
QUEUE_INSERT2(hio, free);
@ -1892,6 +1923,8 @@ sync_thread(void *arg __unused)
ggio->gctl_offset = offset;
ggio->gctl_length = length;
ggio->gctl_error = 0;
hio->hio_done = false;
hio->hio_replication = res->hr_replication;
for (ii = 0; ii < ncomps; ii++)
hio->hio_errors[ii] = EINVAL;
reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
@ -2080,8 +2113,7 @@ primary_config_reload(struct hast_resource *res, struct nv *nv)
* Don't bother if we need to reconnect.
*/
if ((modified & MODIFIED_TIMEOUT) != 0 &&
(modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR |
MODIFIED_REPLICATION)) == 0) {
(modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) {
for (ii = 0; ii < ncomps; ii++) {
if (!ISREMOTE(ii))
continue;
@ -2103,8 +2135,7 @@ primary_config_reload(struct hast_resource *res, struct nv *nv)
}
}
}
if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR |
MODIFIED_REPLICATION)) != 0) {
if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) {
for (ii = 0; ii < ncomps; ii++) {
if (!ISREMOTE(ii))
continue;