b6a05070fa
MFC after: 2 weeks Relnotes: yes
846 lines
22 KiB
C
846 lines
22 KiB
C
/*-
|
|
* Copyright (c) 2012 The FreeBSD Foundation
|
|
* All rights reserved.
|
|
*
|
|
* This software was developed by Pawel Jakub Dawidek under sponsorship from
|
|
* the FreeBSD Foundation.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
|
|
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
|
|
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
|
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
|
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
|
* SUCH DAMAGE.
|
|
*/
|
|
|
|
#include <config/config.h>
|
|
|
|
#include <sys/param.h>
|
|
#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
|
|
#include <sys/endian.h>
|
|
#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
|
|
#ifdef HAVE_MACHINE_ENDIAN_H
|
|
#include <machine/endian.h>
|
|
#else /* !HAVE_MACHINE_ENDIAN_H */
|
|
#ifdef HAVE_ENDIAN_H
|
|
#include <endian.h>
|
|
#else /* !HAVE_ENDIAN_H */
|
|
#error "No supported endian.h"
|
|
#endif /* !HAVE_ENDIAN_H */
|
|
#endif /* !HAVE_MACHINE_ENDIAN_H */
|
|
#include <compat/endian.h>
|
|
#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
|
|
#include <sys/queue.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/wait.h>
|
|
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <unistd.h>
|
|
|
|
#include <ctype.h>
|
|
#include <dirent.h>
|
|
#include <err.h>
|
|
#include <errno.h>
|
|
#include <fcntl.h>
|
|
#ifdef HAVE_LIBUTIL_H
|
|
#include <libutil.h>
|
|
#endif
|
|
#include <signal.h>
|
|
#include <string.h>
|
|
#include <strings.h>
|
|
|
|
#include <openssl/hmac.h>
|
|
|
|
#ifndef HAVE_SIGTIMEDWAIT
|
|
#include "sigtimedwait.h"
|
|
#endif
|
|
|
|
#include "auditdistd.h"
|
|
#include "pjdlog.h"
|
|
#include "proto.h"
|
|
#include "sandbox.h"
|
|
#include "subr.h"
|
|
#include "synch.h"
|
|
#include "trail.h"
|
|
|
|
static struct adist_config *adcfg;
|
|
static struct adist_host *adhost;
|
|
|
|
static pthread_rwlock_t adist_remote_lock;
|
|
static pthread_mutex_t adist_remote_mtx;
|
|
static pthread_cond_t adist_remote_cond;
|
|
static struct trail *adist_trail;
|
|
|
|
static TAILQ_HEAD(, adreq) adist_free_list;
|
|
static pthread_mutex_t adist_free_list_lock;
|
|
static pthread_cond_t adist_free_list_cond;
|
|
static TAILQ_HEAD(, adreq) adist_send_list;
|
|
static pthread_mutex_t adist_send_list_lock;
|
|
static pthread_cond_t adist_send_list_cond;
|
|
static TAILQ_HEAD(, adreq) adist_recv_list;
|
|
static pthread_mutex_t adist_recv_list_lock;
|
|
static pthread_cond_t adist_recv_list_cond;
|
|
|
|
static void
|
|
init_environment(void)
|
|
{
|
|
struct adreq *adreq;
|
|
unsigned int ii;
|
|
|
|
rw_init(&adist_remote_lock);
|
|
mtx_init(&adist_remote_mtx);
|
|
cv_init(&adist_remote_cond);
|
|
TAILQ_INIT(&adist_free_list);
|
|
mtx_init(&adist_free_list_lock);
|
|
cv_init(&adist_free_list_cond);
|
|
TAILQ_INIT(&adist_send_list);
|
|
mtx_init(&adist_send_list_lock);
|
|
cv_init(&adist_send_list_cond);
|
|
TAILQ_INIT(&adist_recv_list);
|
|
mtx_init(&adist_recv_list_lock);
|
|
cv_init(&adist_recv_list_cond);
|
|
|
|
for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
|
|
adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
|
|
if (adreq == NULL) {
|
|
pjdlog_exitx(EX_TEMPFAIL,
|
|
"Unable to allocate %zu bytes of memory for adreq object.",
|
|
sizeof(*adreq) + ADIST_BUF_SIZE);
|
|
}
|
|
adreq->adr_byteorder = ADIST_BYTEORDER;
|
|
adreq->adr_cmd = ADIST_CMD_UNDEFINED;
|
|
adreq->adr_seq = 0;
|
|
adreq->adr_datasize = 0;
|
|
TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
|
|
}
|
|
}
|
|
|
|
static int
|
|
sender_connect(void)
|
|
{
|
|
unsigned char rnd[32], hash[32], resp[32];
|
|
struct proto_conn *conn;
|
|
char welcome[8];
|
|
int16_t val;
|
|
|
|
val = 1;
|
|
if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
|
|
pjdlog_exit(EX_TEMPFAIL,
|
|
"Unable to send connection request to parent");
|
|
}
|
|
if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
|
|
pjdlog_exit(EX_TEMPFAIL,
|
|
"Unable to receive reply to connection request from parent");
|
|
}
|
|
if (val != 0) {
|
|
errno = val;
|
|
pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
|
|
adhost->adh_remoteaddr);
|
|
return (-1);
|
|
}
|
|
if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
|
|
pjdlog_exit(EX_TEMPFAIL,
|
|
"Unable to receive connection from parent");
|
|
}
|
|
if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
|
|
pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
|
|
/* Error in setting timeout is not critical, but why should it fail? */
|
|
if (proto_timeout(conn, adcfg->adc_timeout) < 0)
|
|
pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
|
|
else
|
|
pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
|
|
|
|
/* Exchange welcome message, which includes version number. */
|
|
(void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
|
|
if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
|
|
pjdlog_errno(LOG_WARNING,
|
|
"Unable to send welcome message to %s",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Welcome message sent (%s).", welcome);
|
|
bzero(welcome, sizeof(welcome));
|
|
if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
|
|
pjdlog_errno(LOG_WARNING,
|
|
"Unable to receive welcome message from %s",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
|
|
!isdigit(welcome[6]) || welcome[7] != '\0') {
|
|
pjdlog_warning("Invalid welcome message from %s.",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Welcome message received (%s).", welcome);
|
|
/*
|
|
* Receiver can only reply with version number lower or equal to
|
|
* the one we sent.
|
|
*/
|
|
adhost->adh_version = atoi(welcome + 5);
|
|
if (adhost->adh_version > ADIST_VERSION) {
|
|
pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
|
|
adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
|
|
pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
|
|
adhost->adh_remoteaddr);
|
|
|
|
if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
|
|
pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
|
|
|
|
if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
|
|
pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Challenge received.");
|
|
|
|
if (HMAC(EVP_sha256(), adhost->adh_password,
|
|
(int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
|
|
NULL) == NULL) {
|
|
pjdlog_warning("Unable to generate response.");
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Response generated.");
|
|
|
|
if (proto_send(conn, hash, sizeof(hash)) == -1) {
|
|
pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Response sent.");
|
|
|
|
if (adist_random(rnd, sizeof(rnd)) == -1) {
|
|
pjdlog_warning("Unable to generate challenge.");
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Challenge generated.");
|
|
|
|
if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
|
|
pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Challenge sent.");
|
|
|
|
if (proto_recv(conn, resp, sizeof(resp)) == -1) {
|
|
pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Response received.");
|
|
|
|
if (HMAC(EVP_sha256(), adhost->adh_password,
|
|
(int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
|
|
NULL) == NULL) {
|
|
pjdlog_warning("Unable to generate hash.");
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Hash generated.");
|
|
|
|
if (memcmp(resp, hash, sizeof(hash)) != 0) {
|
|
pjdlog_warning("Invalid response from %s (wrong password?).",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_info("Receiver authenticated.");
|
|
|
|
if (proto_recv(conn, &adhost->adh_trail_offset,
|
|
sizeof(adhost->adh_trail_offset)) == -1) {
|
|
pjdlog_errno(LOG_WARNING,
|
|
"Unable to receive size of the most recent trail file from %s",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
|
|
if (proto_recv(conn, &adhost->adh_trail_name,
|
|
sizeof(adhost->adh_trail_name)) == -1) {
|
|
pjdlog_errno(LOG_WARNING,
|
|
"Unable to receive name of the most recent trail file from %s",
|
|
adhost->adh_remoteaddr);
|
|
proto_close(conn);
|
|
return (-1);
|
|
}
|
|
pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
|
|
adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
|
|
|
|
rw_wlock(&adist_remote_lock);
|
|
mtx_lock(&adist_remote_mtx);
|
|
PJDLOG_ASSERT(adhost->adh_remote == NULL);
|
|
PJDLOG_ASSERT(conn != NULL);
|
|
adhost->adh_remote = conn;
|
|
mtx_unlock(&adist_remote_mtx);
|
|
rw_unlock(&adist_remote_lock);
|
|
cv_signal(&adist_remote_cond);
|
|
|
|
return (0);
|
|
}
|
|
|
|
static void
|
|
sender_disconnect(void)
|
|
{
|
|
|
|
rw_wlock(&adist_remote_lock);
|
|
/*
|
|
* Check for a race between dropping rlock and acquiring wlock -
|
|
* another thread can close connection in-between.
|
|
*/
|
|
if (adhost->adh_remote == NULL) {
|
|
rw_unlock(&adist_remote_lock);
|
|
return;
|
|
}
|
|
pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
|
|
proto_close(adhost->adh_remote);
|
|
mtx_lock(&adist_remote_mtx);
|
|
adhost->adh_remote = NULL;
|
|
adhost->adh_reset = true;
|
|
adhost->adh_trail_name[0] = '\0';
|
|
adhost->adh_trail_offset = 0;
|
|
mtx_unlock(&adist_remote_mtx);
|
|
rw_unlock(&adist_remote_lock);
|
|
|
|
pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
|
|
|
|
/* Move all in-flight requests back onto free list. */
|
|
mtx_lock(&adist_free_list_lock);
|
|
mtx_lock(&adist_send_list_lock);
|
|
TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
|
|
mtx_unlock(&adist_send_list_lock);
|
|
mtx_lock(&adist_recv_list_lock);
|
|
TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
|
|
mtx_unlock(&adist_recv_list_lock);
|
|
mtx_unlock(&adist_free_list_lock);
|
|
}
|
|
|
|
static void
|
|
adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
|
|
size_t size)
|
|
{
|
|
static uint64_t seq = 1;
|
|
|
|
PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
|
|
|
|
switch (cmd) {
|
|
case ADIST_CMD_OPEN:
|
|
case ADIST_CMD_CLOSE:
|
|
PJDLOG_ASSERT(data != NULL && size == 0);
|
|
size = strlen(data) + 1;
|
|
break;
|
|
case ADIST_CMD_APPEND:
|
|
PJDLOG_ASSERT(data != NULL && size > 0);
|
|
break;
|
|
case ADIST_CMD_KEEPALIVE:
|
|
case ADIST_CMD_ERROR:
|
|
PJDLOG_ASSERT(data == NULL && size == 0);
|
|
break;
|
|
default:
|
|
PJDLOG_ABORT("Invalid command (%hhu).", cmd);
|
|
}
|
|
|
|
adreq->adr_cmd = cmd;
|
|
adreq->adr_seq = seq++;
|
|
adreq->adr_datasize = size;
|
|
/* Don't copy if data is already in out buffer. */
|
|
if (data != NULL && data != adreq->adr_data)
|
|
bcopy(data, adreq->adr_data, size);
|
|
}
|
|
|
|
static bool
|
|
read_thread_wait(void)
|
|
{
|
|
bool newfile = false;
|
|
|
|
mtx_lock(&adist_remote_mtx);
|
|
if (adhost->adh_reset) {
|
|
reset:
|
|
adhost->adh_reset = false;
|
|
if (trail_filefd(adist_trail) != -1)
|
|
trail_close(adist_trail);
|
|
trail_reset(adist_trail);
|
|
while (adhost->adh_remote == NULL)
|
|
cv_wait(&adist_remote_cond, &adist_remote_mtx);
|
|
trail_start(adist_trail, adhost->adh_trail_name,
|
|
adhost->adh_trail_offset);
|
|
newfile = true;
|
|
}
|
|
mtx_unlock(&adist_remote_mtx);
|
|
while (trail_filefd(adist_trail) == -1) {
|
|
newfile = true;
|
|
wait_for_dir();
|
|
/*
|
|
* We may have been disconnected and reconnected in the
|
|
* meantime, check if reset is set.
|
|
*/
|
|
mtx_lock(&adist_remote_mtx);
|
|
if (adhost->adh_reset)
|
|
goto reset;
|
|
mtx_unlock(&adist_remote_mtx);
|
|
if (trail_filefd(adist_trail) == -1)
|
|
trail_next(adist_trail);
|
|
}
|
|
if (newfile) {
|
|
pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
|
|
adhost->adh_directory,
|
|
trail_filename(adist_trail));
|
|
(void)wait_for_file_init(trail_filefd(adist_trail));
|
|
}
|
|
return (newfile);
|
|
}
|
|
|
|
static void *
|
|
read_thread(void *arg __unused)
|
|
{
|
|
struct adreq *adreq;
|
|
ssize_t done;
|
|
bool newfile;
|
|
|
|
pjdlog_debug(1, "%s started.", __func__);
|
|
|
|
for (;;) {
|
|
newfile = read_thread_wait();
|
|
QUEUE_TAKE(adreq, &adist_free_list, 0);
|
|
if (newfile) {
|
|
adreq_fill(adreq, ADIST_CMD_OPEN,
|
|
trail_filename(adist_trail), 0);
|
|
newfile = false;
|
|
goto move;
|
|
}
|
|
|
|
done = read(trail_filefd(adist_trail), adreq->adr_data,
|
|
ADIST_BUF_SIZE);
|
|
if (done == -1) {
|
|
off_t offset;
|
|
int error;
|
|
|
|
error = errno;
|
|
offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
|
|
errno = error;
|
|
pjdlog_errno(LOG_ERR,
|
|
"Error while reading \"%s/%s\" at offset %jd",
|
|
adhost->adh_directory, trail_filename(adist_trail),
|
|
offset);
|
|
trail_close(adist_trail);
|
|
adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
|
|
goto move;
|
|
} else if (done == 0) {
|
|
/* End of file. */
|
|
pjdlog_debug(3, "End of \"%s/%s\".",
|
|
adhost->adh_directory, trail_filename(adist_trail));
|
|
if (!trail_switch(adist_trail)) {
|
|
/* More audit records can arrive. */
|
|
mtx_lock(&adist_free_list_lock);
|
|
TAILQ_INSERT_TAIL(&adist_free_list, adreq,
|
|
adr_next);
|
|
mtx_unlock(&adist_free_list_lock);
|
|
wait_for_file();
|
|
continue;
|
|
}
|
|
adreq_fill(adreq, ADIST_CMD_CLOSE,
|
|
trail_filename(adist_trail), 0);
|
|
trail_close(adist_trail);
|
|
goto move;
|
|
}
|
|
|
|
adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
|
|
move:
|
|
pjdlog_debug(3,
|
|
"read thread: Moving request %p to the send queue (%hhu).",
|
|
adreq, adreq->adr_cmd);
|
|
QUEUE_INSERT(adreq, &adist_send_list);
|
|
}
|
|
/* NOTREACHED */
|
|
return (NULL);
|
|
}
|
|
|
|
static void
|
|
keepalive_send(void)
|
|
{
|
|
struct adreq *adreq;
|
|
|
|
rw_rlock(&adist_remote_lock);
|
|
if (adhost->adh_remote == NULL) {
|
|
rw_unlock(&adist_remote_lock);
|
|
return;
|
|
}
|
|
rw_unlock(&adist_remote_lock);
|
|
|
|
mtx_lock(&adist_free_list_lock);
|
|
adreq = TAILQ_FIRST(&adist_free_list);
|
|
if (adreq != NULL)
|
|
TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
|
|
mtx_unlock(&adist_free_list_lock);
|
|
if (adreq == NULL)
|
|
return;
|
|
|
|
adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
|
|
|
|
QUEUE_INSERT(adreq, &adist_send_list);
|
|
|
|
pjdlog_debug(3, "keepalive_send: Request sent.");
|
|
}
|
|
|
|
/*
|
|
* Thread sends request to secondary node.
|
|
*/
|
|
static void *
|
|
send_thread(void *arg __unused)
|
|
{
|
|
time_t lastcheck, now;
|
|
struct adreq *adreq;
|
|
|
|
pjdlog_debug(1, "%s started.", __func__);
|
|
|
|
lastcheck = time(NULL);
|
|
|
|
for (;;) {
|
|
pjdlog_debug(3, "send thread: Taking request.");
|
|
for (;;) {
|
|
QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
|
|
if (adreq != NULL)
|
|
break;
|
|
now = time(NULL);
|
|
if (lastcheck + ADIST_KEEPALIVE <= now) {
|
|
keepalive_send();
|
|
lastcheck = now;
|
|
}
|
|
}
|
|
PJDLOG_ASSERT(adreq != NULL);
|
|
pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
|
|
adreq->adr_cmd);
|
|
/*
|
|
* Protect connection from disappearing.
|
|
*/
|
|
rw_rlock(&adist_remote_lock);
|
|
/*
|
|
* Move the request to the recv queue first to avoid race
|
|
* where the recv thread receives the reply before we move
|
|
* the request to the recv queue.
|
|
*/
|
|
QUEUE_INSERT(adreq, &adist_recv_list);
|
|
if (adhost->adh_remote == NULL ||
|
|
proto_send(adhost->adh_remote, &adreq->adr_packet,
|
|
ADPKT_SIZE(adreq)) == -1) {
|
|
rw_unlock(&adist_remote_lock);
|
|
pjdlog_debug(1,
|
|
"send thread: (%p) Unable to send request.", adreq);
|
|
if (adhost->adh_remote != NULL)
|
|
sender_disconnect();
|
|
continue;
|
|
} else {
|
|
pjdlog_debug(3, "Request %p sent successfully.", adreq);
|
|
adreq_log(LOG_DEBUG, 2, -1, adreq,
|
|
"send: (%p) Request sent: ", adreq);
|
|
rw_unlock(&adist_remote_lock);
|
|
}
|
|
}
|
|
/* NOTREACHED */
|
|
return (NULL);
|
|
}
|
|
|
|
static void
|
|
adrep_decode_header(struct adrep *adrep)
|
|
{
|
|
|
|
/* Byte-swap only is the receiver is using different byte order. */
|
|
if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
|
|
adrep->adrp_byteorder = ADIST_BYTEORDER;
|
|
adrep->adrp_seq = bswap64(adrep->adrp_seq);
|
|
adrep->adrp_error = bswap16(adrep->adrp_error);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Thread receives answer from secondary node and passes it to ggate_send
|
|
* thread.
|
|
*/
|
|
static void *
|
|
recv_thread(void *arg __unused)
|
|
{
|
|
struct adrep adrep;
|
|
struct adreq *adreq;
|
|
|
|
pjdlog_debug(1, "%s started.", __func__);
|
|
|
|
for (;;) {
|
|
/* Wait until there is anything to receive. */
|
|
QUEUE_WAIT(&adist_recv_list);
|
|
pjdlog_debug(3, "recv thread: Got something.");
|
|
rw_rlock(&adist_remote_lock);
|
|
if (adhost->adh_remote == NULL) {
|
|
/*
|
|
* Connection is dead.
|
|
* XXX: We shouldn't be here.
|
|
*/
|
|
rw_unlock(&adist_remote_lock);
|
|
continue;
|
|
}
|
|
if (proto_recv(adhost->adh_remote, &adrep,
|
|
sizeof(adrep)) == -1) {
|
|
rw_unlock(&adist_remote_lock);
|
|
pjdlog_errno(LOG_ERR, "Unable to receive reply");
|
|
sender_disconnect();
|
|
continue;
|
|
}
|
|
rw_unlock(&adist_remote_lock);
|
|
adrep_decode_header(&adrep);
|
|
/*
|
|
* Find the request that was just confirmed.
|
|
*/
|
|
mtx_lock(&adist_recv_list_lock);
|
|
TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
|
|
if (adreq->adr_seq == adrep.adrp_seq) {
|
|
TAILQ_REMOVE(&adist_recv_list, adreq,
|
|
adr_next);
|
|
break;
|
|
}
|
|
}
|
|
if (adreq == NULL) {
|
|
/*
|
|
* If we disconnected in the meantime, just continue.
|
|
* On disconnect sender_disconnect() clears the queue,
|
|
* we can use that.
|
|
*/
|
|
if (TAILQ_EMPTY(&adist_recv_list)) {
|
|
mtx_unlock(&adist_recv_list_lock);
|
|
continue;
|
|
}
|
|
mtx_unlock(&adist_recv_list_lock);
|
|
pjdlog_error("Found no request matching received 'seq' field (%ju).",
|
|
(uintmax_t)adrep.adrp_seq);
|
|
sender_disconnect();
|
|
continue;
|
|
}
|
|
mtx_unlock(&adist_recv_list_lock);
|
|
adreq_log(LOG_DEBUG, 2, -1, adreq,
|
|
"recv thread: (%p) Request confirmed: ", adreq);
|
|
pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
|
|
adreq->adr_cmd);
|
|
if (adrep.adrp_error != 0) {
|
|
pjdlog_error("Receiver returned error (%s), disconnecting.",
|
|
adist_errstr((int)adrep.adrp_error));
|
|
sender_disconnect();
|
|
continue;
|
|
}
|
|
if (adreq->adr_cmd == ADIST_CMD_CLOSE)
|
|
trail_unlink(adist_trail, adreq->adr_data);
|
|
pjdlog_debug(3, "Request received successfully.");
|
|
QUEUE_INSERT(adreq, &adist_free_list);
|
|
}
|
|
/* NOTREACHED */
|
|
return (NULL);
|
|
}
|
|
|
|
static void
|
|
guard_check_connection(void)
|
|
{
|
|
|
|
PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
|
|
|
|
rw_rlock(&adist_remote_lock);
|
|
if (adhost->adh_remote != NULL) {
|
|
rw_unlock(&adist_remote_lock);
|
|
pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
|
|
adhost->adh_remoteaddr);
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Upgrade the lock. It doesn't have to be atomic as no other thread
|
|
* can change connection status from disconnected to connected.
|
|
*/
|
|
rw_unlock(&adist_remote_lock);
|
|
pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
|
|
adhost->adh_remoteaddr);
|
|
if (sender_connect() == 0) {
|
|
pjdlog_info("Successfully reconnected to %s.",
|
|
adhost->adh_remoteaddr);
|
|
} else {
|
|
pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
|
|
adhost->adh_remoteaddr);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Thread guards remote connections and reconnects when needed, handles
|
|
* signals, etc.
|
|
*/
|
|
static void *
|
|
guard_thread(void *arg __unused)
|
|
{
|
|
struct timespec timeout;
|
|
time_t lastcheck, now;
|
|
sigset_t mask;
|
|
int signo;
|
|
|
|
lastcheck = time(NULL);
|
|
|
|
PJDLOG_VERIFY(sigemptyset(&mask) == 0);
|
|
PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
|
|
PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
|
|
|
|
timeout.tv_sec = ADIST_KEEPALIVE;
|
|
timeout.tv_nsec = 0;
|
|
signo = -1;
|
|
|
|
for (;;) {
|
|
switch (signo) {
|
|
case SIGINT:
|
|
case SIGTERM:
|
|
sigexit_received = true;
|
|
pjdlog_exitx(EX_OK,
|
|
"Termination signal received, exiting.");
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
pjdlog_debug(3, "remote_guard: Checking connections.");
|
|
now = time(NULL);
|
|
if (lastcheck + ADIST_KEEPALIVE <= now) {
|
|
guard_check_connection();
|
|
lastcheck = now;
|
|
}
|
|
signo = sigtimedwait(&mask, NULL, &timeout);
|
|
}
|
|
/* NOTREACHED */
|
|
return (NULL);
|
|
}
|
|
|
|
void
|
|
adist_sender(struct adist_config *config, struct adist_host *adh)
|
|
{
|
|
pthread_t td;
|
|
pid_t pid;
|
|
int error, mode, debuglevel;
|
|
|
|
/*
|
|
* Create communication channel for sending connection requests from
|
|
* child to parent.
|
|
*/
|
|
if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
|
|
pjdlog_errno(LOG_ERR,
|
|
"Unable to create connection sockets between child and parent");
|
|
return;
|
|
}
|
|
|
|
pid = fork();
|
|
if (pid == -1) {
|
|
pjdlog_errno(LOG_ERR, "Unable to fork");
|
|
proto_close(adh->adh_conn);
|
|
adh->adh_conn = NULL;
|
|
return;
|
|
}
|
|
|
|
if (pid > 0) {
|
|
/* This is parent. */
|
|
adh->adh_worker_pid = pid;
|
|
/* Declare that we are receiver. */
|
|
proto_recv(adh->adh_conn, NULL, 0);
|
|
return;
|
|
}
|
|
|
|
adcfg = config;
|
|
adhost = adh;
|
|
|
|
mode = pjdlog_mode_get();
|
|
debuglevel = pjdlog_debug_get();
|
|
|
|
/* Declare that we are sender. */
|
|
proto_send(adhost->adh_conn, NULL, 0);
|
|
|
|
descriptors_cleanup(adhost);
|
|
|
|
#ifdef TODO
|
|
descriptors_assert(adhost, mode);
|
|
#endif
|
|
|
|
pjdlog_init(mode);
|
|
pjdlog_debug_set(debuglevel);
|
|
pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
|
|
role2str(adhost->adh_role));
|
|
#ifdef HAVE_SETPROCTITLE
|
|
setproctitle("[%s] (%s) ", adhost->adh_name,
|
|
role2str(adhost->adh_role));
|
|
#endif
|
|
|
|
/*
|
|
* The sender process should be able to remove entries from its
|
|
* trail directory, but it should not be able to write to the
|
|
* trail files, only read from them.
|
|
*/
|
|
adist_trail = trail_new(adhost->adh_directory, false);
|
|
if (adist_trail == NULL)
|
|
exit(EX_OSFILE);
|
|
|
|
if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
|
|
role2str(adhost->adh_role), adhost->adh_name) != 0) {
|
|
exit(EX_CONFIG);
|
|
}
|
|
pjdlog_info("Privileges successfully dropped.");
|
|
|
|
/*
|
|
* We can ignore wait_for_dir_init() failures. It will fall back to
|
|
* using sleep(3).
|
|
*/
|
|
(void)wait_for_dir_init(trail_dirfd(adist_trail));
|
|
|
|
init_environment();
|
|
if (sender_connect() == 0) {
|
|
pjdlog_info("Successfully connected to %s.",
|
|
adhost->adh_remoteaddr);
|
|
}
|
|
adhost->adh_reset = true;
|
|
|
|
/*
|
|
* Create the guard thread first, so we can handle signals from the
|
|
* very begining.
|
|
*/
|
|
error = pthread_create(&td, NULL, guard_thread, NULL);
|
|
PJDLOG_ASSERT(error == 0);
|
|
error = pthread_create(&td, NULL, send_thread, NULL);
|
|
PJDLOG_ASSERT(error == 0);
|
|
error = pthread_create(&td, NULL, recv_thread, NULL);
|
|
PJDLOG_ASSERT(error == 0);
|
|
(void)read_thread(NULL);
|
|
}
|