Reimplement ggatec/ggated applications.

Change communication protocol to be much more resistant on network
problems and to allow for much better performance.

Better performance is achieved by creating two connections between
ggatec and ggated one for sending the data and one for receiving it.
Every connection is handled by separeted thread, so there is no more
synchronous data flow (send and wait for response), now one threads
sends all requests and another receives the data.

Use two threads in ggatec(8):
- sendtd, which takes I/O requests from the kernel and sends them to the
  ggated daemon on the other end;
- recvtd, which waits for ggated responses and forwards them to the kernel.

Use three threads in ggated(8):
- recvtd, which waits for I/O requests and puts them onto incoming queue;
- disktd, which takes requests from the incoming queue, does disk operations
  and puts finished requests onto outgoing queue;
- sendtd, which takes finished requests from the outgoing queue and sends
  responses back to ggatec.

Because there were major changes in communication protocol, there is no
backward compatibility, from now on, both client and server has to run
on 5.x or 6.x (or at least ggated should be from the same FreeBSD version
on which ggatec is running).

For Gbit networks some buffers need to be increased. I use those settings:
kern.ipc.maxsockbuf=16777216
net.inet.tcp.sendspace=8388608
net.inet.tcp.recvspace=8388608
and I use '-S 4194304 -R 4194304' options for both, ggatec and ggated.

Approved by:	re (scottl)
This commit is contained in:
Pawel Jakub Dawidek 2005-07-08 21:28:26 +00:00
parent 84436f14c4
commit 7be67fe3d1
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=147844
6 changed files with 1104 additions and 465 deletions

View File

@ -9,7 +9,7 @@ SRCS= ggatec.c ggate.c
CFLAGS+= -DLIBGEOM
CFLAGS+= -I${.CURDIR}/../shared
DPADD= ${LIBGEOM} ${LIBSBUF} ${LIBBSDXML} ${LIBUTIL}
LDADD= -lgeom -lsbuf -lbsdxml -lutil
DPADD= ${LIBGEOM} ${LIBSBUF} ${LIBBSDXML} ${LIBUTIL} ${LIBPTHREAD}
LDADD= -lgeom -lsbuf -lbsdxml -lutil -lpthread
.include <bsd.prog.mk>

View File

@ -34,8 +34,12 @@
#include <string.h>
#include <ctype.h>
#include <libgen.h>
#include <pthread.h>
#include <signal.h>
#include <err.h>
#include <errno.h>
#include <assert.h>
#include <sys/param.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
@ -51,21 +55,22 @@
#include "ggate.h"
enum { UNSET, ATTACH, CREATE, DESTROY, LIST } action = UNSET;
enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET;
static const char *path = NULL;
static const char *host = NULL;
static int unit = -1;
static unsigned flags = 0;
static int force = 0;
static int nagle = 1;
static unsigned queue_size = G_GATE_QUEUE_SIZE;
static unsigned port = G_GATE_PORT;
static off_t mediasize;
static unsigned sectorsize = 0;
static unsigned timeout = G_GATE_TIMEOUT;
static unsigned rcvbuf = G_GATE_RCVBUF;
static unsigned sndbuf = G_GATE_SNDBUF;
static int sendfd, recvfd;
static uint32_t token;
static pthread_t sendtd, recvtd;
static int reconnect;
static void
usage(void)
@ -74,129 +79,30 @@ usage(void)
fprintf(stderr, "usage: %s create [-nv] [-o <ro|wo|rw>] [-p port] "
"[-q queue_size] [-R rcvbuf] [-S sndbuf] [-s sectorsize] "
"[-t timeout] [-u unit] <host> <path>\n", getprogname());
fprintf(stderr, " %s attach [-nv] [-o <ro|wo|rw>] [-p port] "
fprintf(stderr, " %s rescue [-nv] [-o <ro|wo|rw>] [-p port] "
"[-R rcvbuf] [-S sndbuf] <-u unit> <host> <path>\n", getprogname());
fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname());
fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname());
exit(EXIT_FAILURE);
}
static int
handshake(void)
{
struct g_gate_cinit cinit;
struct g_gate_sinit sinit;
struct sockaddr_in serv;
struct timeval tv;
size_t bsize;
int sfd;
/*
* Do the network stuff.
*/
bzero(&serv, sizeof(serv));
serv.sin_family = AF_INET;
serv.sin_addr.s_addr = g_gate_str2ip(host);
if (serv.sin_addr.s_addr == INADDR_NONE) {
g_gate_log(LOG_ERR, "Invalid IP/host name: %s.", host);
return (-1);
}
serv.sin_port = htons(port);
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
g_gate_xlog("Can't open socket: %s.", strerror(errno));
/*
* Some trivial network optimalization.
* This should be much more advanced.
*/
if (nagle) {
int on = 1;
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on,
sizeof(on)) == -1) {
g_gate_xlog("setsockopt() error: %s.", strerror(errno));
}
}
bsize = rcvbuf;
if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1)
g_gate_xlog("setsockopt() error: %s.", strerror(errno));
bsize = sndbuf;
if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1)
g_gate_xlog("setsockopt() error: %s.", strerror(errno));
tv.tv_sec = timeout;
tv.tv_usec = 0;
if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1 ||
setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
g_gate_xlog("setsockopt() error: %s.", strerror(errno));
}
if (connect(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) {
g_gate_log(LOG_ERR, "Can't connect to server: %s.",
strerror(errno));
return (-1);
}
g_gate_log(LOG_INFO, "Connected to the server: %s:%d.", host, port);
/*
* Creating and sending initial packet.
*/
if (strlcpy(cinit.gc_path, path, sizeof(cinit.gc_path)) >=
sizeof(cinit.gc_path)) {
g_gate_xlog("Path name too long.");
}
cinit.gc_flags = flags;
g_gate_log(LOG_DEBUG, "Sending initial packet.");
g_gate_swap2n_cinit(&cinit);
if (send(sfd, &cinit, sizeof(cinit), 0) == -1) {
g_gate_log(LOG_ERR, "Error while sending initial packet: %s.",
strerror(errno));
return (-1);
}
g_gate_swap2h_cinit(&cinit);
/*
* Receiving initial packet from server.
*/
g_gate_log(LOG_DEBUG, "Receiving initial packet.");
if (recv(sfd, &sinit, sizeof(sinit), MSG_WAITALL) == -1) {
g_gate_log(LOG_ERR, "Error while receiving data: %s.",
strerror(errno));
return (-1);
}
g_gate_swap2h_sinit(&sinit);
if (sinit.gs_error != 0)
g_gate_xlog("Error from server: %s.", strerror(sinit.gs_error));
mediasize = sinit.gs_mediasize;
if (sectorsize == 0)
sectorsize = sinit.gs_sectorsize;
return (sfd);
}
static int
serve(int sfd)
static void *
send_thread(void *arg __unused)
{
struct g_gate_ctl_io ggio;
size_t bsize;
char *buf;
struct g_gate_hdr hdr;
char buf[MAXPHYS];
ssize_t data;
int error;
bsize = G_GATE_BUFSIZE_START;
buf = malloc(bsize);
if (buf == NULL) {
if (action == CREATE)
g_gate_destroy(unit, 1);
g_gate_xlog("No enough memory");
}
g_gate_log(LOG_NOTICE, "%s: started!", __func__);
ggio.gctl_version = G_GATE_VERSION;
ggio.gctl_unit = unit;
bsize = sectorsize;
ggio.gctl_data = malloc(bsize);
ggio.gctl_data = buf;
for (;;) {
struct g_gate_hdr hdr;
int data, error;
once_again:
ggio.gctl_length = bsize;
ggio.gctl_length = sizeof(buf);
ggio.gctl_error = 0;
g_gate_ioctl(G_GATE_CMD_START, &ggio);
error = ggio.gctl_error;
@ -204,11 +110,12 @@ serve(int sfd)
case 0:
break;
case ECANCELED:
if (reconnect)
break;
/* Exit gracefully. */
free(ggio.gctl_data);
g_gate_close_device();
close(sfd);
exit(EXIT_SUCCESS);
#if 0
case ENOMEM:
/* Buffer too small. */
ggio.gctl_data = realloc(ggio.gctl_data,
@ -218,87 +125,232 @@ serve(int sfd)
goto once_again;
}
/* FALLTHROUGH */
#endif
case ENXIO:
default:
g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME,
strerror(error));
}
hdr.gh_cmd = ggio.gctl_cmd;
if (reconnect)
break;
switch (ggio.gctl_cmd) {
case BIO_READ:
hdr.gh_cmd = GGATE_CMD_READ;
break;
case BIO_WRITE:
hdr.gh_cmd = GGATE_CMD_WRITE;
break;
}
hdr.gh_seq = ggio.gctl_seq;
hdr.gh_offset = ggio.gctl_offset;
hdr.gh_length = ggio.gctl_length;
hdr.gh_error = 0;
g_gate_swap2n_hdr(&hdr);
data = send(sfd, &hdr, sizeof(hdr), 0);
data = g_gate_send(sendfd, &hdr, sizeof(hdr), MSG_NOSIGNAL);
g_gate_log(LOG_DEBUG, "Sent hdr packet.");
g_gate_swap2h_hdr(&hdr);
if (reconnect)
break;
if (data != sizeof(hdr)) {
ggio.gctl_error = EAGAIN;
goto done;
g_gate_log(LOG_ERR, "Lost connection 1.");
reconnect = 1;
pthread_kill(recvtd, SIGUSR1);
break;
}
if (ggio.gctl_cmd == BIO_DELETE || ggio.gctl_cmd == BIO_WRITE) {
data = send(sfd, ggio.gctl_data, ggio.gctl_length, 0);
g_gate_log(LOG_DEBUG, "Sent data packet.");
if (hdr.gh_cmd == GGATE_CMD_WRITE) {
data = g_gate_send(sendfd, ggio.gctl_data,
ggio.gctl_length, MSG_NOSIGNAL);
if (reconnect)
break;
if (data != ggio.gctl_length) {
ggio.gctl_error = EAGAIN;
goto done;
g_gate_log(LOG_ERR, "Lost connection 2 (%zd != %zd).", data, (ssize_t)ggio.gctl_length);
reconnect = 1;
pthread_kill(recvtd, SIGUSR1);
break;
}
g_gate_log(LOG_DEBUG, "Sent %d bytes (offset=%llu, "
g_gate_log(LOG_DEBUG, "Sent %zd bytes (offset=%llu, "
"size=%u).", data, hdr.gh_offset, hdr.gh_length);
}
data = recv(sfd, &hdr, sizeof(hdr), MSG_WAITALL);
g_gate_log(LOG_DEBUG, "Received hdr packet.");
}
g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
return (NULL);
}
static void *
recv_thread(void *arg __unused)
{
struct g_gate_ctl_io ggio;
struct g_gate_hdr hdr;
char buf[MAXPHYS];
ssize_t data;
g_gate_log(LOG_NOTICE, "%s: started!", __func__);
ggio.gctl_version = G_GATE_VERSION;
ggio.gctl_unit = unit;
ggio.gctl_data = buf;
for (;;) {
data = g_gate_recv(recvfd, &hdr, sizeof(hdr), MSG_WAITALL);
if (reconnect)
break;
g_gate_swap2h_hdr(&hdr);
if (data != sizeof(hdr)) {
ggio.gctl_error = EIO;
goto done;
if (data == -1 && errno == EAGAIN)
continue;
g_gate_log(LOG_ERR, "Lost connection 3.");
reconnect = 1;
pthread_kill(sendtd, SIGUSR1);
break;
}
if (ggio.gctl_cmd == BIO_READ) {
if (bsize < (size_t)ggio.gctl_length) {
ggio.gctl_data = realloc(ggio.gctl_data,
ggio.gctl_length);
if (ggio.gctl_data != NULL)
bsize = ggio.gctl_length;
else
g_gate_xlog("No memory.");
}
data = recv(sfd, ggio.gctl_data, ggio.gctl_length,
MSG_WAITALL);
g_gate_log(LOG_DEBUG, "Received hdr packet.");
ggio.gctl_seq = hdr.gh_seq;
ggio.gctl_cmd = hdr.gh_cmd;
ggio.gctl_offset = hdr.gh_offset;
ggio.gctl_length = hdr.gh_length;
ggio.gctl_error = hdr.gh_error;
if (ggio.gctl_error == 0 && ggio.gctl_cmd == GGATE_CMD_READ) {
data = g_gate_recv(recvfd, ggio.gctl_data,
ggio.gctl_length, MSG_WAITALL);
if (reconnect)
break;
g_gate_log(LOG_DEBUG, "Received data packet.");
if (data != ggio.gctl_length) {
ggio.gctl_error = EAGAIN;
goto done;
g_gate_log(LOG_ERR, "Lost connection 4.");
reconnect = 1;
pthread_kill(sendtd, SIGUSR1);
break;
}
g_gate_log(LOG_DEBUG, "Received %d bytes (offset=%ju, "
"size=%zu).", data, (uintmax_t)hdr.gh_offset,
(size_t)hdr.gh_length);
}
done:
g_gate_ioctl(G_GATE_CMD_DONE, &ggio);
if (ggio.gctl_error == EAGAIN)
return (ggio.gctl_error);
}
/* NOTREACHED */
return (0);
g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
pthread_exit(NULL);
}
static void
serve_loop(int sfd)
static int
handshake(int dir)
{
struct g_gate_version ver;
struct g_gate_cinit cinit;
struct g_gate_sinit sinit;
struct sockaddr_in serv;
int sfd;
for (;;) {
int error;
error = serve(sfd);
close(sfd);
if (error != EAGAIN)
g_gate_xlog("%s.", strerror(error));
sfd = handshake();
if (sfd == -1) {
sleep(2);
continue;
}
/*
* Do the network stuff.
*/
bzero(&serv, sizeof(serv));
serv.sin_family = AF_INET;
serv.sin_addr.s_addr = g_gate_str2ip(host);
if (serv.sin_addr.s_addr == INADDR_NONE) {
g_gate_log(LOG_DEBUG, "Invalid IP/host name: %s.", host);
return (-1);
}
serv.sin_port = htons(port);
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1) {
g_gate_log(LOG_DEBUG, "Cannot open socket: %s.",
strerror(errno));
return (-1);
}
g_gate_socket_settings(sfd);
if (connect(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) {
g_gate_log(LOG_DEBUG, "Cannot connect to server: %s.",
strerror(errno));
close(sfd);
return (-1);
}
g_gate_log(LOG_INFO, "Connected to the server: %s:%d.", host, port);
/*
* Create and send version packet.
*/
g_gate_log(LOG_DEBUG, "Sending version packet.");
assert(strlen(GGATE_MAGIC) == sizeof(ver.gv_magic));
bcopy(GGATE_MAGIC, ver.gv_magic, sizeof(ver.gv_magic));
ver.gv_version = GGATE_VERSION;
ver.gv_error = 0;
g_gate_swap2n_version(&ver);
if (g_gate_send(sfd, &ver, sizeof(ver), MSG_NOSIGNAL) == -1) {
g_gate_log(LOG_DEBUG, "Error while sending version packet: %s.",
strerror(errno));
close(sfd);
return (-1);
}
bzero(&ver, sizeof(ver));
if (g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL) == -1) {
g_gate_log(LOG_DEBUG, "Error while receiving data: %s.",
strerror(errno));
close(sfd);
return (-1);
}
if (ver.gv_error != 0) {
g_gate_log(LOG_DEBUG, "Version verification problem: %s.",
strerror(errno));
close(sfd);
return (-1);
}
/*
* Create and send initial packet.
*/
g_gate_log(LOG_DEBUG, "Sending initial packet.");
if (strlcpy(cinit.gc_path, path, sizeof(cinit.gc_path)) >=
sizeof(cinit.gc_path)) {
g_gate_log(LOG_DEBUG, "Path name too long.");
close(sfd);
return (-1);
}
cinit.gc_flags = flags | dir;
cinit.gc_token = token;
cinit.gc_nconn = 2;
g_gate_swap2n_cinit(&cinit);
if (g_gate_send(sfd, &cinit, sizeof(cinit), MSG_NOSIGNAL) == -1) {
g_gate_log(LOG_DEBUG, "Error while sending initial packet: %s.",
strerror(errno));
close(sfd);
return (-1);
}
g_gate_swap2h_cinit(&cinit);
/*
* Receiving initial packet from server.
*/
g_gate_log(LOG_DEBUG, "Receiving initial packet.");
if (g_gate_recv(sfd, &sinit, sizeof(sinit), MSG_WAITALL) == -1) {
g_gate_log(LOG_DEBUG, "Error while receiving data: %s.",
strerror(errno));
close(sfd);
return (-1);
}
g_gate_swap2h_sinit(&sinit);
if (sinit.gs_error != 0) {
g_gate_log(LOG_DEBUG, "Error from server: %s.",
strerror(sinit.gs_error));
close(sfd);
return (-1);
}
g_gate_log(LOG_DEBUG, "Received initial packet.");
mediasize = sinit.gs_mediasize;
if (sectorsize == 0)
sectorsize = sinit.gs_sectorsize;
return (sfd);
}
static void
@ -314,26 +366,87 @@ mydaemon(void)
err(EXIT_FAILURE, "Cannot daemonize");
}
static void
g_gatec_attach(void)
static int
g_gatec_connect(void)
{
int sfd;
sfd = handshake();
g_gate_log(LOG_DEBUG, "Worker created: %u.", getpid());
mydaemon();
serve_loop(sfd);
token = arc4random();
/*
* Our receive descriptor is connected to the send descriptor on the
* server side.
*/
recvfd = handshake(GGATE_FLAG_SEND);
if (recvfd == -1)
return (0);
/*
* Our send descriptor is connected to the receive descriptor on the
* server side.
*/
sendfd = handshake(GGATE_FLAG_RECV);
if (sendfd == -1)
return (0);
return (1);
}
static void
g_gatec_start(void)
{
int error;
reconnect = 0;
error = pthread_create(&recvtd, NULL, recv_thread, NULL);
if (error != 0) {
g_gate_destroy(unit, 1);
g_gate_xlog("pthread_create(recv_thread): %s.",
strerror(error));
}
sendtd = pthread_self();
send_thread(NULL);
/* Disconnected. */
close(sendfd);
close(recvfd);
}
static void
signop(int sig __unused)
{
/* Do nothing. */
}
static void
g_gatec_loop(void)
{
struct g_gate_ctl_cancel ggioc;
signal(SIGUSR1, signop);
for (;;) {
g_gatec_start();
g_gate_log(LOG_NOTICE, "Disconnected [%s %s]. Connecting...",
host, path);
while (!g_gatec_connect()) {
sleep(2);
g_gate_log(LOG_NOTICE, "Connecting [%s %s]...", host,
path);
}
ggioc.gctl_version = G_GATE_VERSION;
ggioc.gctl_unit = unit;
ggioc.gctl_seq = 0;
g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
}
}
static void
g_gatec_create(void)
{
struct g_gate_ctl_create ggioc;
int sfd;
sfd = handshake();
if (sfd == -1)
exit(EXIT_FAILURE);
if (!g_gatec_connect())
g_gate_xlog("Cannot connect: %s.", strerror(errno));
/*
* Ok, got both sockets, time to create provider.
*/
ggioc.gctl_version = G_GATE_VERSION;
ggioc.gctl_mediasize = mediasize;
ggioc.gctl_sectorsize = sectorsize;
@ -344,12 +457,29 @@ g_gatec_create(void)
snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s:%u %s", host,
port, path);
g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc);
g_gate_log(LOG_DEBUG, "Worker created: %u.", getpid());
if (unit == -1)
printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit);
unit = ggioc.gctl_unit;
mydaemon();
serve_loop(sfd);
g_gatec_loop();
}
static void
g_gatec_rescue(void)
{
struct g_gate_ctl_cancel ggioc;
if (!g_gatec_connect())
g_gate_xlog("Cannot connect: %s.", strerror(errno));
ggioc.gctl_version = G_GATE_VERSION;
ggioc.gctl_unit = unit;
ggioc.gctl_seq = 0;
g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
mydaemon();
g_gatec_loop();
}
int
@ -358,14 +488,14 @@ main(int argc, char *argv[])
if (argc < 2)
usage();
if (strcasecmp(argv[1], "attach") == 0)
action = ATTACH;
else if (strcasecmp(argv[1], "create") == 0)
if (strcasecmp(argv[1], "create") == 0)
action = CREATE;
else if (strcasecmp(argv[1], "destroy") == 0)
action = DESTROY;
else if (strcasecmp(argv[1], "list") == 0)
action = LIST;
else if (strcasecmp(argv[1], "rescue") == 0)
action = RESCUE;
else
usage();
argc -= 1;
@ -383,12 +513,12 @@ main(int argc, char *argv[])
force = 1;
break;
case 'n':
if (action != ATTACH && action != CREATE)
if (action != CREATE && action != RESCUE)
usage();
nagle = 0;
break;
case 'o':
if (action != ATTACH && action != CREATE)
if (action != CREATE && action != RESCUE)
usage();
if (strcasecmp("ro", optarg) == 0)
flags = G_GATE_FLAG_READONLY;
@ -402,7 +532,7 @@ main(int argc, char *argv[])
}
break;
case 'p':
if (action != ATTACH && action != CREATE)
if (action != CREATE && action != RESCUE)
usage();
errno = 0;
port = strtoul(optarg, NULL, 10);
@ -418,7 +548,7 @@ main(int argc, char *argv[])
errx(EXIT_FAILURE, "Invalid queue_size.");
break;
case 'R':
if (action != ATTACH && action != CREATE)
if (action != CREATE && action != RESCUE)
usage();
errno = 0;
rcvbuf = strtoul(optarg, NULL, 10);
@ -426,7 +556,7 @@ main(int argc, char *argv[])
errx(EXIT_FAILURE, "Invalid rcvbuf.");
break;
case 'S':
if (action != ATTACH && action != CREATE)
if (action != CREATE && action != RESCUE)
usage();
errno = 0;
sndbuf = strtoul(optarg, NULL, 10);
@ -468,18 +598,6 @@ main(int argc, char *argv[])
argv += optind;
switch (action) {
case ATTACH:
if (argc != 2)
usage();
if (unit == -1) {
fprintf(stderr, "Required unit number.\n");
usage();
}
g_gate_open_device();
host = argv[0];
path = argv[1];
g_gatec_attach();
break;
case CREATE:
if (argc != 2)
usage();
@ -501,6 +619,18 @@ main(int argc, char *argv[])
case LIST:
g_gate_list(unit, g_gate_verbose);
break;
case RESCUE:
if (argc != 2)
usage();
if (unit == -1) {
fprintf(stderr, "Required unit number.\n");
usage();
}
g_gate_open_device();
host = argv[0];
path = argv[1];
g_gatec_rescue();
break;
case UNSET:
default:
usage();

View File

@ -6,6 +6,9 @@ PROG= ggated
MAN= ggated.8
SRCS= ggated.c ggate.c
DPADD= ${LIBPTHREAD}
LDADD= -lpthread
CFLAGS+= -I${.CURDIR}/../shared
.include <bsd.prog.mk>

File diff suppressed because it is too large Load Diff

View File

@ -38,6 +38,7 @@
#include <sys/linker.h>
#include <sys/module.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <signal.h>
#include <err.h>
@ -206,17 +207,6 @@ g_gate_destroy(int unit, int force)
g_gate_ioctl(G_GATE_CMD_DESTROY, &ggio);
}
int
g_gate_openflags(unsigned ggflags)
{
if ((ggflags & G_GATE_FLAG_READONLY) != 0)
return (O_RDONLY);
else if ((ggflags & G_GATE_FLAG_WRITEONLY) != 0)
return (O_WRONLY);
return (O_RDWR);
}
void
g_gate_load_module(void)
{
@ -232,6 +222,76 @@ g_gate_load_module(void)
}
}
ssize_t
g_gate_send(int s, const void *buf, size_t len, int flags)
{
ssize_t done = 0, done2;
const unsigned char *p = buf;
while (len > 0) {
done2 = send(s, p, len, flags);
if (done2 == 0)
break;
else if (done2 == -1) {
if (errno == EAGAIN) {
printf("%s: EAGAIN\n", __func__);
continue;
}
done = -1;
break;
}
done += done2;
p += done2;
len -= done2;
}
return (done);
}
ssize_t
g_gate_recv(int s, void *buf, size_t len, int flags)
{
return (recv(s, buf, len, flags));
}
int nagle = 1;
unsigned rcvbuf = G_GATE_RCVBUF;
unsigned sndbuf = G_GATE_SNDBUF;
void
g_gate_socket_settings(int sfd)
{
struct timeval tv;
int bsize, on;
/* Socket settings. */
on = 1;
if (nagle) {
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on,
sizeof(on)) == -1) {
g_gate_xlog("setsockopt() error: %s.", strerror(errno));
}
}
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1)
g_gate_xlog("setsockopt(SO_REUSEADDR): %s.", strerror(errno));
bsize = rcvbuf;
if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1)
g_gate_xlog("setsockopt(SO_RCVBUF): %s.", strerror(errno));
bsize = sndbuf;
if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1)
g_gate_xlog("setsockopt(SO_SNDBUF): %s.", strerror(errno));
tv.tv_sec = 1;
tv.tv_usec = 0;
if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) {
g_gate_log(LOG_ERR, "setsockopt(SO_SNDTIMEO) error: %s.",
strerror(errno));
}
if (setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
g_gate_log(LOG_ERR, "setsockopt(SO_RCVTIMEO) error: %s.",
strerror(errno));
}
}
#ifdef LIBGEOM
static struct gclass *
find_class(struct gmesh *mesh, const char *name)

View File

@ -32,22 +32,49 @@
#include <sys/endian.h>
#include <stdarg.h>
#define G_GATE_BUFSIZE_START 65536
#define G_GATE_PORT 3080
#define G_GATE_RCVBUF 131072
#define G_GATE_SNDBUF 131072
#define G_GATE_QUEUE_SIZE 1024
#define G_GATE_TIMEOUT 30
#define G_GATE_TIMEOUT 0
#define GGATE_MAGIC "GEOM_GATE "
#define GGATE_VERSION 0
#define GGATE_FLAG_RDONLY 0x0001
#define GGATE_FLAG_WRONLY 0x0002
/*
* If GGATE_FLAG_SEND not GGATE_FLAG_RECV flag is set, this is initial
* connection.
* If GGATE_FLAG_SEND flag is set - this is socket to send data.
* If GGATE_FLAG_RECV flag is set - this is socket to receive data.
*/
#define GGATE_FLAG_SEND 0x0004
#define GGATE_FLAG_RECV 0x0008
#define GGATE_CMD_READ 0
#define GGATE_CMD_WRITE 1
extern int g_gate_devfd;
extern int g_gate_verbose;
extern int nagle;
extern unsigned rcvbuf, sndbuf;
struct g_gate_version {
char gv_magic[16];
uint16_t gv_version;
uint16_t gv_error;
} __packed;
/* Client's initial packet. */
struct g_gate_cinit {
char gc_path[PATH_MAX + 1];
uint8_t gc_flags;
};
char gc_path[PATH_MAX + 1];
uint64_t gc_flags;
uint16_t gc_nconn;
uint32_t gc_token;
} __packed;
/* Server's initial packet. */
struct g_gate_sinit {
@ -55,15 +82,16 @@ struct g_gate_sinit {
uint64_t gs_mediasize;
uint32_t gs_sectorsize;
uint16_t gs_error;
};
} __packed;
/* Control struct. */
struct g_gate_hdr {
uint8_t gh_cmd; /* command */
uint64_t gh_offset; /* device offset */
uint32_t gh_length; /* size of block */
int16_t gh_error; /* error value (0 if ok) */
};
uint64_t gh_seq; /* request number */
uint16_t gh_error; /* error value (0 if ok) */
} __packed;
void g_gate_vlog(int priority, const char *message, va_list ap);
void g_gate_log(int priority, const char *message, ...);
@ -75,8 +103,10 @@ void g_gate_open_device(void);
void g_gate_close_device(void);
void g_gate_ioctl(unsigned long req, void *data);
void g_gate_destroy(int unit, int force);
int g_gate_openflags(unsigned ggflags);
void g_gate_load_module(void);
ssize_t g_gate_recv(int s, void *buf, size_t len, int flags);
ssize_t g_gate_send(int s, const void *buf, size_t len, int flags);
void g_gate_socket_settings(int sfd);
#ifdef LIBGEOM
void g_gate_list(int unit, int verbose);
#endif
@ -89,17 +119,37 @@ in_addr_t g_gate_str2ip(const char *str);
*/
static __inline void
g_gate_swap2h_cinit(struct g_gate_cinit *cinit __unused)
g_gate_swap2h_version(struct g_gate_version *ver)
{
/* Nothing here for now. */
ver->gv_version = be16toh(ver->gv_version);
ver->gv_error = be16toh(ver->gv_error);
}
static __inline void
g_gate_swap2n_cinit(struct g_gate_cinit *cinit __unused)
g_gate_swap2n_version(struct g_gate_version *ver)
{
/* Nothing here for now. */
ver->gv_version = htobe16(ver->gv_version);
ver->gv_error = htobe16(ver->gv_error);
}
static __inline void
g_gate_swap2h_cinit(struct g_gate_cinit *cinit)
{
cinit->gc_flags = be64toh(cinit->gc_flags);
cinit->gc_nconn = be16toh(cinit->gc_nconn);
cinit->gc_token = be32toh(cinit->gc_token);
}
static __inline void
g_gate_swap2n_cinit(struct g_gate_cinit *cinit)
{
cinit->gc_flags = htobe64(cinit->gc_flags);
cinit->gc_nconn = htobe16(cinit->gc_nconn);
cinit->gc_token = htobe32(cinit->gc_token);
}
static __inline void
@ -129,6 +179,7 @@ g_gate_swap2h_hdr(struct g_gate_hdr *hdr)
/* Swap only used fields. */
hdr->gh_offset = be64toh(hdr->gh_offset);
hdr->gh_length = be32toh(hdr->gh_length);
hdr->gh_seq = be64toh(hdr->gh_seq);
hdr->gh_error = be16toh(hdr->gh_error);
}
@ -139,6 +190,7 @@ g_gate_swap2n_hdr(struct g_gate_hdr *hdr)
/* Swap only used fields. */
hdr->gh_offset = htobe64(hdr->gh_offset);
hdr->gh_length = htobe32(hdr->gh_length);
hdr->gh_seq = htobe64(hdr->gh_seq);
hdr->gh_error = htobe16(hdr->gh_error);
}
#endif /* _GGATE_H_ */