- Add support for 'memsync' mode. This is the fastest replication mode that's

why it will now be the default.
- Bump protocol version to 2 and add backward compatibility for version 1.
- Allow to specify hosts by kern.hostid as well (in addition to hostname and
  kern.hostuuid) in configuration file.

Sponsored by:	Panzura
Tested by:	trociny
This commit is contained in:
Pawel Jakub Dawidek 2013-02-17 21:12:34 +00:00
parent bb7ca8229d
commit d6e636c988
8 changed files with 334 additions and 43 deletions

View File

@ -129,9 +129,13 @@ The
.Aq node
argument can be replaced either by a full hostname as obtained by
.Xr gethostname 3 ,
only first part of the hostname, or by node's UUID as found in the
only first part of the hostname, by node's UUID as found in the
.Va kern.hostuuid
.Xr sysctl 8
variable
or by node's hostid as found in the
.Va kern.hostid
.Xr sysctl 8
variable.
.Pp
The following statements are available:
@ -208,15 +212,12 @@ to the application was lost.
The risk of such a situation is very small.
The
.Ic memsync
replication mode is currently not implemented.
replication mode is the default.
.It Ic fullsync
.Pp
Mark the write operation as completed when local as well as remote
write completes.
This is the safest and the slowest replication mode.
The
.Ic fullsync
replication mode is the default.
.It Ic async
.Pp
The write operation is reported as complete right after the local write

View File

@ -53,8 +53,9 @@
* Version history:
* 0 - initial version
* 1 - HIO_KEEPALIVE added
* 2 - "memsync" and "received" attributes added for memsync mode
*/
#define HAST_PROTO_VERSION 1
#define HAST_PROTO_VERSION 2
#define EHAST_OK 0
#define EHAST_NOENTRY 1
@ -142,8 +143,10 @@ struct hastd_config {
struct hast_resource {
/* Resource name. */
char hr_name[NAME_MAX];
/* Replication mode (HAST_REPLICATION_*). */
/* Negotiated replication mode (HAST_REPLICATION_*). */
int hr_replication;
/* Configured replication mode (HAST_REPLICATION_*). */
int hr_original_replication;
/* Provider name that will appear in /dev/hast/. */
char hr_provname[NAME_MAX];
/* Synchronization extent size. */
@ -156,6 +159,8 @@ struct hast_resource {
int hr_compression;
/* Checksum algorithm. */
int hr_checksum;
/* Protocol version. */
int hr_version;
/* Path to local component. */
char hr_localpath[PATH_MAX];

View File

@ -112,7 +112,7 @@ hast_proto_send(const struct hast_resource *res, struct proto_conn *conn,
if (eb == NULL)
goto end;
hdr.version = HAST_PROTO_VERSION;
hdr.version = res != NULL ? res->hr_version : HAST_PROTO_VERSION;
hdr.size = htole32((uint32_t)ebuf_size(eb));
if (ebuf_add_head(eb, &hdr, sizeof(hdr)) == -1)
goto end;
@ -144,7 +144,7 @@ hast_proto_recv_hdr(const struct proto_conn *conn, struct nv **nvp)
if (proto_recv(conn, &hdr, sizeof(hdr)) == -1)
goto fail;
if (hdr.version != HAST_PROTO_VERSION) {
if (hdr.version > HAST_PROTO_VERSION) {
errno = ERPCMISMATCH;
goto fail;
}

View File

@ -68,7 +68,7 @@ static struct hastd_config *cfg;
bool sigexit_received = false;
/* Path to pidfile. */
static const char *pidfile;
/* PID file handle. */
/* Pidfile handle. */
struct pidfh *pfh;
/* Do we run in foreground? */
static bool foreground;
@ -748,6 +748,7 @@ listen_accept(struct hastd_listen *lst)
const char *resname;
const unsigned char *token;
char laddr[256], raddr[256];
uint8_t version;
size_t size;
pid_t pid;
int status;
@ -797,6 +798,20 @@ listen_accept(struct hastd_listen *lst)
goto close;
}
pjdlog_debug(2, "%s: resource=%s", raddr, resname);
version = nv_get_uint8(nvin, "version");
pjdlog_debug(2, "%s: version=%hhu", raddr, version);
if (version == 0) {
/*
* If no version is sent, it means this is protocol version 1.
*/
version = 1;
}
if (version > HAST_PROTO_VERSION) {
pjdlog_info("Remote protocol version %hhu is not supported, falling back to version %hhu.",
version, (unsigned char)HAST_PROTO_VERSION);
version = HAST_PROTO_VERSION;
}
pjdlog_debug(1, "Negotiated protocol version %hhu.", version);
token = nv_get_uint8_array(nvin, &size, "token");
/*
* NULL token means that this is first connection.
@ -910,8 +925,10 @@ listen_accept(struct hastd_listen *lst)
*/
if (token == NULL) {
res->hr_version = version;
arc4random_buf(res->hr_token, sizeof(res->hr_token));
nvout = nv_alloc();
nv_add_uint8(nvout, version, "version");
nv_add_uint8_array(nvout, res->hr_token,
sizeof(res->hr_token), "token");
if (nv_error(nvout) != 0) {
@ -922,7 +939,7 @@ listen_accept(struct hastd_listen *lst)
strerror(nv_error(nvout)));
goto fail;
}
if (hast_proto_send(NULL, conn, nvout, NULL, 0) == -1) {
if (hast_proto_send(res, conn, nvout, NULL, 0) == -1) {
int error = errno;
pjdlog_errno(LOG_ERR, "Unable to send response to %s",

View File

@ -236,6 +236,7 @@ replication_statement: REPLICATION replication_type
case 1:
PJDLOG_ASSERT(curres != NULL);
curres->hr_replication = $2;
curres->hr_original_replication = $2;
break;
default:
PJDLOG_ABORT("replication at wrong depth level");
@ -533,8 +534,10 @@ resource_start: STR
curres->hr_role = HAST_ROLE_INIT;
curres->hr_previous_role = HAST_ROLE_INIT;
curres->hr_replication = -1;
curres->hr_original_replication = -1;
curres->hr_checksum = -1;
curres->hr_compression = -1;
curres->hr_version = 1;
curres->hr_timeout = -1;
curres->hr_exec[0] = '\0';
curres->hr_provname[0] = '\0';
@ -724,6 +727,7 @@ static int
isitme(const char *name)
{
char buf[MAXHOSTNAMELEN];
unsigned long hostid;
char *pos;
size_t bufsize;
@ -738,7 +742,7 @@ isitme(const char *name)
return (1);
/*
* Now check if it matches first part of the host name.
* Check if it matches first part of the host name.
*/
pos = strchr(buf, '.');
if (pos != NULL && (size_t)(pos - buf) == strlen(name) &&
@ -747,7 +751,7 @@ isitme(const char *name)
}
/*
* At the end check if name is equal to our host's UUID.
* Check if it matches host UUID.
*/
bufsize = sizeof(buf);
if (sysctlbyname("kern.hostuuid", buf, &bufsize, NULL, 0) < 0) {
@ -757,6 +761,18 @@ isitme(const char *name)
if (strcasecmp(buf, name) == 0)
return (1);
/*
* Check if it matches hostid.
*/
bufsize = sizeof(hostid);
if (sysctlbyname("kern.hostid", &hostid, &bufsize, NULL, 0) < 0) {
pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostid) failed");
return (-1);
}
(void)snprintf(buf, sizeof(buf), "hostid%lu", hostid);
if (strcmp(buf, name) == 0)
return (1);
/*
* Looks like this isn't about us.
*/
@ -769,7 +785,7 @@ family_supported(int family)
int sock;
sock = socket(family, SOCK_STREAM, 0);
if (sock == -1 && errno == EAFNOSUPPORT)
if (sock == -1 && errno == EPROTONOSUPPORT)
return (false);
if (sock >= 0)
(void)close(sock);
@ -781,6 +797,7 @@ node_names(char **namesp)
{
static char names[MAXHOSTNAMELEN * 3];
char buf[MAXHOSTNAMELEN];
unsigned long hostid;
char *pos;
size_t bufsize;
@ -808,6 +825,16 @@ node_names(char **namesp)
return (-1);
}
(void)strlcat(names, buf, sizeof(names));
(void)strlcat(names, ", ", sizeof(names));
/* Host ID. */
bufsize = sizeof(hostid);
if (sysctlbyname("kern.hostid", &hostid, &bufsize, NULL, 0) < 0) {
pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostid) failed");
return (-1);
}
(void)snprintf(buf, sizeof(buf), "hostid%lu", hostid);
(void)strlcat(names, buf, sizeof(names));
*namesp = names;
@ -833,7 +860,7 @@ yy_config_parse(const char *config, bool exitonerror)
lineno = 0;
depth0_timeout = HAST_TIMEOUT;
depth0_replication = HAST_REPLICATION_FULLSYNC;
depth0_replication = HAST_REPLICATION_MEMSYNC;
depth0_checksum = HAST_CHECKSUM_NONE;
depth0_compression = HAST_COMPRESSION_HOLE;
strlcpy(depth0_control, HAST_CONTROL, sizeof(depth0_control));
@ -943,11 +970,7 @@ yy_config_parse(const char *config, bool exitonerror)
* Use global or default setting.
*/
curres->hr_replication = depth0_replication;
}
if (curres->hr_replication == HAST_REPLICATION_MEMSYNC) {
pjdlog_warning("Replication mode \"%s\" is not implemented, falling back to \"%s\".",
"memsync", "fullsync");
curres->hr_replication = HAST_REPLICATION_FULLSYNC;
curres->hr_original_replication = depth0_replication;
}
if (curres->hr_checksum == -1) {
/*

View File

@ -35,7 +35,6 @@ __FBSDID("$FreeBSD$");
#include <sys/time.h>
#include <sys/bio.h>
#include <sys/disk.h>
#include <sys/refcount.h>
#include <sys/stat.h>
#include <geom/gate/g_gate.h>
@ -65,6 +64,7 @@ __FBSDID("$FreeBSD$");
#include "metadata.h"
#include "proto.h"
#include "pjdlog.h"
#include "refcnt.h"
#include "subr.h"
#include "synch.h"
@ -543,7 +543,7 @@ primary_connect(struct hast_resource *res, struct proto_conn **connp)
return (0);
}
/*
* Function instructs GEOM_GATE to handle reads directly from within the kernel.
*/
@ -577,6 +577,7 @@ init_remote(struct hast_resource *res, struct proto_conn **inp,
int32_t extentsize;
int64_t datasize;
uint32_t mapsize;
uint8_t version;
size_t size;
int error;
@ -597,6 +598,7 @@ init_remote(struct hast_resource *res, struct proto_conn **inp,
*/
nvout = nv_alloc();
nv_add_string(nvout, res->hr_name, "resource");
nv_add_uint8(nvout, HAST_PROTO_VERSION, "version");
if (nv_error(nvout) != 0) {
pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
"Unable to allocate header for connection with %s",
@ -626,6 +628,20 @@ init_remote(struct hast_resource *res, struct proto_conn **inp,
nv_free(nvin);
goto close;
}
version = nv_get_uint8(nvin, "version");
if (version == 0) {
/*
* If no version is sent, it means this is protocol version 1.
*/
version = 1;
}
if (version > HAST_PROTO_VERSION) {
pjdlog_warning("Invalid version received (%hhu).", version);
nv_free(nvin);
goto close;
}
res->hr_version = version;
pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version);
token = nv_get_uint8_array(nvin, &size, "token");
if (token == NULL) {
pjdlog_warning("Handshake header from %s has no 'token' field.",
@ -776,6 +792,16 @@ init_remote(struct hast_resource *res, struct proto_conn **inp,
pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
#endif
pjdlog_info("Connected to %s.", res->hr_remoteaddr);
if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC &&
res->hr_version < 2) {
pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode.");
res->hr_replication = HAST_REPLICATION_FULLSYNC;
} else if (res->hr_replication != res->hr_original_replication) {
/*
* This is in case hastd disconnected and was upgraded.
*/
res->hr_replication = res->hr_original_replication;
}
if (inp != NULL && outp != NULL) {
*inp = in;
*outp = out;
@ -1009,7 +1035,8 @@ hastd_primary(struct hast_resource *res)
}
static void
reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...)
reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio,
const char *fmt, ...)
{
char msg[1024];
va_list ap;
@ -1020,21 +1047,18 @@ reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt
switch (ggio->gctl_cmd) {
case BIO_READ:
(void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).",
(uintmax_t)ggio->gctl_offset,
(uintmax_t)ggio->gctl_length);
(uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
break;
case BIO_DELETE:
(void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).",
(uintmax_t)ggio->gctl_offset,
(uintmax_t)ggio->gctl_length);
(uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
break;
case BIO_FLUSH:
(void)snprlcat(msg, sizeof(msg), "FLUSH.");
break;
case BIO_WRITE:
(void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).",
(uintmax_t)ggio->gctl_offset,
(uintmax_t)ggio->gctl_length);
(uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
break;
default:
(void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).",
@ -1274,8 +1298,13 @@ ggate_recv_thread(void *arg)
}
pjdlog_debug(2,
"ggate_recv: (%p) Moving request to the send queues.", hio);
refcount_init(&hio->hio_countdown, ncomps);
for (ii = ncomp; ii < ncomp + ncomps; ii++)
hio->hio_countdown = ncomps;
if (hio->hio_replication == HAST_REPLICATION_MEMSYNC &&
ggio->gctl_cmd == BIO_WRITE) {
/* Each remote request needs two responses in memsync. */
hio->hio_countdown++;
}
for (ii = ncomp; ii < ncomps; ii++)
QUEUE_INSERT1(hio, send, ii);
}
/* NOTREACHED */
@ -1346,8 +1375,7 @@ local_send_thread(void *arg)
} else {
hio->hio_errors[ncomp] = 0;
if (hio->hio_replication ==
HAST_REPLICATION_ASYNC &&
!ISSYNCREQ(hio)) {
HAST_REPLICATION_ASYNC) {
ggio->gctl_error = 0;
write_complete(res, hio);
}
@ -1385,8 +1413,42 @@ local_send_thread(void *arg)
}
break;
}
if (!refcount_release(&hio->hio_countdown))
continue;
if (hio->hio_replication != HAST_REPLICATION_MEMSYNC ||
ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) {
if (refcnt_release(&hio->hio_countdown) > 0)
continue;
} else {
/*
* Depending on hio_countdown value, requests finished
* in the following order:
* 0: remote memsync, remote final, local write
* 1: remote memsync, local write, (remote final)
* 2: local write, (remote memsync), (remote final)
*/
switch (refcnt_release(&hio->hio_countdown)) {
case 0:
/*
* Local write finished as last.
*/
break;
case 1:
/*
* Local write finished after remote memsync
* reply arrvied. We can complete the write now.
*/
if (hio->hio_errors[0] == 0)
write_complete(res, hio);
continue;
case 2:
/*
* Local write finished as first.
*/
continue;
default:
PJDLOG_ABORT("Invalid hio_countdown.");
}
}
if (ISSYNCREQ(hio)) {
mtx_lock(&sync_lock);
SYNCREQDONE(hio);
@ -1508,6 +1570,10 @@ remote_send_thread(void *arg)
nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
nv_add_uint64(nv, offset, "offset");
nv_add_uint64(nv, length, "length");
if (hio->hio_replication == HAST_REPLICATION_MEMSYNC &&
ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) {
nv_add_uint8(nv, 1, "memsync");
}
if (nv_error(nv) != 0) {
hio->hio_errors[ncomp] = nv_error(nv);
pjdlog_debug(2,
@ -1568,7 +1634,7 @@ remote_send_thread(void *arg)
done_queue:
nv_free(nv);
if (ISSYNCREQ(hio)) {
if (!refcount_release(&hio->hio_countdown))
if (refcnt_release(&hio->hio_countdown) > 0)
continue;
mtx_lock(&sync_lock);
SYNCREQDONE(hio);
@ -1583,8 +1649,10 @@ remote_send_thread(void *arg)
(void)hast_activemap_flush(res);
}
mtx_unlock(&res->hr_amp_lock);
if (hio->hio_replication == HAST_REPLICATION_MEMSYNC)
(void)refcnt_release(&hio->hio_countdown);
}
if (!refcount_release(&hio->hio_countdown))
if (refcnt_release(&hio->hio_countdown) > 0)
continue;
pjdlog_debug(2,
"remote_send: (%p) Moving request to the done queue.",
@ -1608,6 +1676,7 @@ remote_recv_thread(void *arg)
struct nv *nv;
unsigned int ncomp;
uint64_t seq;
bool memsyncack;
int error;
/* Remote component is 1 for now. */
@ -1623,6 +1692,8 @@ remote_recv_thread(void *arg)
}
mtx_unlock(&hio_recv_list_lock[ncomp]);
memsyncack = false;
rw_rlock(&hio_remote_lock[ncomp]);
if (!ISCONNECTED(res, ncomp)) {
rw_unlock(&hio_remote_lock[ncomp]);
@ -1652,6 +1723,7 @@ remote_recv_thread(void *arg)
nv_free(nv);
continue;
}
memsyncack = nv_exists(nv, "received");
mtx_lock(&hio_recv_list_lock[ncomp]);
TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
if (hio->hio_ggio.gctl_seq == seq) {
@ -1707,8 +1779,80 @@ remote_recv_thread(void *arg)
hio->hio_errors[ncomp] = 0;
nv_free(nv);
done_queue:
if (!refcount_release(&hio->hio_countdown))
continue;
if (hio->hio_replication != HAST_REPLICATION_MEMSYNC ||
hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) {
if (refcnt_release(&hio->hio_countdown) > 0)
continue;
} else {
/*
* Depending on hio_countdown value, requests finished
* in the following order:
*
* 0: local write, remote memsync, remote final
* or
* 0: remote memsync, local write, remote final
*
* 1: local write, remote memsync, (remote final)
* or
* 1: remote memsync, remote final, (local write)
*
* 2: remote memsync, (local write), (remote final)
* or
* 2: remote memsync, (remote final), (local write)
*/
switch (refcnt_release(&hio->hio_countdown)) {
case 0:
/*
* Remote final reply arrived.
*/
PJDLOG_ASSERT(!memsyncack);
break;
case 1:
if (memsyncack) {
/*
* Local request already finished, so we
* can complete the write.
*/
if (hio->hio_errors[0] == 0)
write_complete(res, hio);
/*
* We still need to wait for final
* remote reply.
*/
pjdlog_debug(2,
"remote_recv: (%p) Moving request back to the recv queue.",
hio);
mtx_lock(&hio_recv_list_lock[ncomp]);
TAILQ_INSERT_TAIL(&hio_recv_list[ncomp],
hio, hio_next[ncomp]);
mtx_unlock(&hio_recv_list_lock[ncomp]);
} else {
/*
* Remote final reply arrived before
* local write finished.
* Nothing to do in such case.
*/
}
continue;
case 2:
/*
* We received remote memsync reply even before
* local write finished.
*/
PJDLOG_ASSERT(memsyncack);
pjdlog_debug(2,
"remote_recv: (%p) Moving request back to the recv queue.",
hio);
mtx_lock(&hio_recv_list_lock[ncomp]);
TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio,
hio_next[ncomp]);
mtx_unlock(&hio_recv_list_lock[ncomp]);
continue;
default:
PJDLOG_ABORT("Invalid hio_countdown.");
}
}
if (ISSYNCREQ(hio)) {
mtx_lock(&sync_lock);
SYNCREQDONE(hio);
@ -1977,7 +2121,7 @@ sync_thread(void *arg __unused)
ncomp = 1;
}
mtx_unlock(&metadata_lock);
refcount_init(&hio->hio_countdown, 1);
hio->hio_countdown = 1;
QUEUE_INSERT1(hio, send, ncomp);
/*
@ -2027,7 +2171,7 @@ sync_thread(void *arg __unused)
pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
hio);
refcount_init(&hio->hio_countdown, 1);
hio->hio_countdown = 1;
QUEUE_INSERT1(hio, send, ncomp);
/*

57
sbin/hastd/refcnt.h Normal file
View File

@ -0,0 +1,57 @@
/*-
* Copyright (c) 2005 John Baldwin <jhb@FreeBSD.org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the author nor the names of any co-contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $FreeBSD$
*/
#ifndef __REFCNT_H__
#define __REFCNT_H__
#include <machine/atomic.h>
#include "pjdlog.h"
static __inline void
refcnt_acquire(volatile unsigned int *count)
{
atomic_add_acq_int(count, 1);
}
static __inline unsigned int
refcnt_release(volatile unsigned int *count)
{
unsigned int old;
/* XXX: Should this have a rel membar? */
old = atomic_fetchadd_int(count, -1);
PJDLOG_ASSERT(old > 0);
return (old - 1);
}
#endif /* ! __REFCNT_H__ */

View File

@ -71,6 +71,7 @@ struct hio {
uint8_t hio_cmd;
uint64_t hio_offset;
uint64_t hio_length;
bool hio_memsync;
TAILQ_ENTRY(hio) hio_next;
};
@ -135,6 +136,22 @@ hio_clear(struct hio *hio)
hio->hio_cmd = HIO_UNDEF;
hio->hio_offset = 0;
hio->hio_length = 0;
hio->hio_memsync = false;
}
static void
hio_copy(const struct hio *srchio, struct hio *dsthio)
{
/*
* We don't copy hio_error, hio_data and hio_next fields.
*/
dsthio->hio_seq = srchio->hio_seq;
dsthio->hio_cmd = srchio->hio_cmd;
dsthio->hio_offset = srchio->hio_offset;
dsthio->hio_length = srchio->hio_length;
dsthio->hio_memsync = srchio->hio_memsync;
}
static void
@ -543,8 +560,10 @@ requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv)
case HIO_FLUSH:
case HIO_KEEPALIVE:
break;
case HIO_READ:
case HIO_WRITE:
hio->hio_memsync = nv_exists(nv, "memsync");
/* FALLTHROUGH */
case HIO_READ:
case HIO_DELETE:
hio->hio_offset = nv_get_uint64(nv, "offset");
if (nv_error(nv) != 0) {
@ -621,7 +640,7 @@ static void *
recv_thread(void *arg)
{
struct hast_resource *res = arg;
struct hio *hio;
struct hio *hio, *mshio;
struct nv *nv;
for (;;) {
@ -675,6 +694,27 @@ recv_thread(void *arg)
secondary_exit(EX_TEMPFAIL,
"Unable to receive request data");
}
if (hio->hio_memsync) {
/*
* For memsync requests we expect two replies.
* Clone the hio so we can handle both of them.
*/
pjdlog_debug(2, "recv: Taking free request.");
QUEUE_TAKE(free, mshio);
pjdlog_debug(2, "recv: (%p) Got request.",
mshio);
hio_copy(hio, mshio);
mshio->hio_error = 0;
/*
* We want to keep 'memsync' tag only on the
* request going onto send queue (mshio).
*/
hio->hio_memsync = false;
pjdlog_debug(2,
"recv: (%p) Moving memsync request to the send queue.",
mshio);
QUEUE_INSERT(send, mshio);
}
}
nv_free(nv);
pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
@ -818,6 +858,10 @@ send_thread(void *arg)
nvout = nv_alloc();
/* Copy sequence number. */
nv_add_uint64(nvout, hio->hio_seq, "seq");
if (hio->hio_memsync) {
PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE);
nv_add_int8(nvout, 1, "received");
}
switch (hio->hio_cmd) {
case HIO_READ:
if (hio->hio_error == 0) {