Xen 3.2 now interleaves watch events with regular message notifications.
More graciously handle processing messages and watch events inline prior to threads being up and running. MFC after: 1 month
This commit is contained in:
parent
03a5241ea0
commit
b1f7c9438e
@ -114,6 +114,7 @@ int xb_write(const void *tdata, unsigned len)
|
||||
while (len != 0) {
|
||||
void *dst;
|
||||
unsigned int avail;
|
||||
|
||||
wait_event_interruptible(&xb_waitq,
|
||||
(intf->req_prod - intf->req_cons) !=
|
||||
XENSTORE_RING_SIZE);
|
||||
|
@ -38,10 +38,14 @@ int xb_write(const void *data, unsigned len);
|
||||
int xb_read(void *data, unsigned len);
|
||||
int xs_input_avail(void);
|
||||
extern int xb_waitq;
|
||||
extern int xenbus_running;
|
||||
|
||||
#define __wait_event_interruptible(wchan, condition, ret) \
|
||||
do { \
|
||||
for (;;) { \
|
||||
if (xenbus_running == 0) { \
|
||||
break; \
|
||||
} \
|
||||
if (condition) \
|
||||
break; \
|
||||
if ((ret = !tsleep(wchan, PWAIT | PCATCH, "waitev", hz/10))) \
|
||||
@ -96,9 +100,6 @@ do { \
|
||||
#define BUG_ON PANIC_IF
|
||||
#define semaphore sema
|
||||
#define rw_semaphore sema
|
||||
typedef struct mtx spinlock_t;
|
||||
#define spin_lock mtx_lock
|
||||
#define spin_unlock mtx_unlock
|
||||
#define DEFINE_SPINLOCK(lock) struct mtx lock
|
||||
#define DECLARE_MUTEX(lock) struct sema lock
|
||||
#define u32 uint32_t
|
||||
|
@ -58,8 +58,6 @@ __FBSDID("$FreeBSD$");
|
||||
#define BUG_ON PANIC_IF
|
||||
#define semaphore sema
|
||||
#define rw_semaphore sema
|
||||
#define spin_lock mtx_lock
|
||||
#define spin_unlock mtx_unlock
|
||||
#define DEFINE_SPINLOCK(lock) struct mtx lock
|
||||
#define DECLARE_MUTEX(lock) struct sema lock
|
||||
#define u32 uint32_t
|
||||
|
@ -1048,8 +1048,9 @@ xenbus_probe_sysinit(void *unused)
|
||||
/* Enumerate devices in xenstore. */
|
||||
xenbus_probe_devices(&xenbus_frontend);
|
||||
register_xenbus_watch(&fe_watch);
|
||||
#ifdef notyet
|
||||
xenbus_backend_probe_and_watch();
|
||||
|
||||
#endif
|
||||
|
||||
/* Notify others that xenstore is up */
|
||||
EVENTHANDLER_INVOKE(xenstore_event);
|
||||
|
@ -67,8 +67,6 @@ __FBSDID("$FreeBSD$");
|
||||
#define BUG_ON PANIC_IF
|
||||
#define semaphore sema
|
||||
#define rw_semaphore sema
|
||||
#define spin_lock mtx_lock
|
||||
#define spin_unlock mtx_unlock
|
||||
#define DEFINE_SPINLOCK(lock) struct mtx lock
|
||||
#define DECLARE_MUTEX(lock) struct sema lock
|
||||
#define u32 uint32_t
|
||||
@ -76,7 +74,6 @@ __FBSDID("$FreeBSD$");
|
||||
#define simple_strtoul strtoul
|
||||
#define ARRAY_SIZE(x) (sizeof(x)/sizeof(x[0]))
|
||||
#define list_empty TAILQ_EMPTY
|
||||
#define wake_up wakeup
|
||||
|
||||
extern struct xendev_list_head xenbus_device_backend_list;
|
||||
#if 0
|
||||
|
@ -57,13 +57,11 @@ __FBSDID("$FreeBSD$");
|
||||
#include <machine/stdarg.h>
|
||||
|
||||
#include <xen/xenbus/xenbus_comms.h>
|
||||
int xs_process_msg(void);
|
||||
static int xs_process_msg(enum xsd_sockmsg_type *type);
|
||||
|
||||
#define kmalloc(size, unused) malloc(size, M_DEVBUF, M_WAITOK)
|
||||
#define BUG_ON PANIC_IF
|
||||
#define DEFINE_SPINLOCK(lock) struct mtx lock
|
||||
#define spin_lock mtx_lock
|
||||
#define spin_unlock mtx_unlock
|
||||
#define u32 uint32_t
|
||||
#define list_del(head, ent) TAILQ_REMOVE(head, ent, list)
|
||||
#define simple_strtoul strtoul
|
||||
@ -71,6 +69,8 @@ int xs_process_msg(void);
|
||||
#define list_empty TAILQ_EMPTY
|
||||
|
||||
#define streq(a, b) (strcmp((a), (b)) == 0)
|
||||
int xenwatch_running = 0;
|
||||
int xenbus_running = 0;
|
||||
|
||||
struct kvec {
|
||||
const void *iov_base;
|
||||
@ -100,7 +100,7 @@ struct xs_stored_msg {
|
||||
struct xs_handle {
|
||||
/* A list of replies. Currently only one will ever be outstanding. */
|
||||
TAILQ_HEAD(xs_handle_list, xs_stored_msg) reply_list;
|
||||
spinlock_t reply_lock;
|
||||
struct mtx reply_lock;
|
||||
int reply_waitq;
|
||||
|
||||
/* One request at a time. */
|
||||
@ -154,7 +154,7 @@ static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
|
||||
/*
|
||||
* Give other domain time to run :-/
|
||||
*/
|
||||
for (i = 0; i < 10000; i++)
|
||||
for (i = 0; i < 100000; i++)
|
||||
HYPERVISOR_yield();
|
||||
xs_process_msg();
|
||||
}
|
||||
@ -249,11 +249,14 @@ static void *xs_talkv(struct xenbus_transaction t,
|
||||
for (i = 0; i < num_vecs; i++)
|
||||
msg.len += iovec[i].iov_len;
|
||||
|
||||
printf("xs_talkv ");
|
||||
|
||||
sx_xlock(&xs_state.request_mutex);
|
||||
|
||||
err = xb_write(&msg, sizeof(msg));
|
||||
if (err) {
|
||||
sx_xunlock(&xs_state.request_mutex);
|
||||
printf("xs_talkv failed %d\n", err);
|
||||
return ERR_PTR(err);
|
||||
}
|
||||
|
||||
@ -261,6 +264,7 @@ static void *xs_talkv(struct xenbus_transaction t,
|
||||
err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
|
||||
if (err) {
|
||||
sx_xunlock(&xs_state.request_mutex);
|
||||
printf("xs_talkv failed %d\n", err);
|
||||
return ERR_PTR(err);
|
||||
}
|
||||
}
|
||||
@ -277,8 +281,19 @@ static void *xs_talkv(struct xenbus_transaction t,
|
||||
kfree(ret);
|
||||
return ERR_PTR(-err);
|
||||
}
|
||||
|
||||
BUG_ON(msg.type != type);
|
||||
|
||||
if (xenwatch_running == 0) {
|
||||
while (!TAILQ_EMPTY(&watch_events)) {
|
||||
struct xs_stored_msg *wmsg = TAILQ_FIRST(&watch_events);
|
||||
list_del(&watch_events, wmsg);
|
||||
wmsg->u.watch.handle->callback(
|
||||
wmsg->u.watch.handle,
|
||||
(const char **)wmsg->u.watch.vec,
|
||||
wmsg->u.watch.vec_size);
|
||||
}
|
||||
}
|
||||
BUG_ON(msg.type != type);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -290,6 +305,7 @@ static void *xs_single(struct xenbus_transaction t,
|
||||
{
|
||||
struct kvec iovec;
|
||||
|
||||
printf("xs_single %s ", string);
|
||||
iovec.iov_base = (const void *)string;
|
||||
iovec.iov_len = strlen(string) + 1;
|
||||
return xs_talkv(t, type, &iovec, 1, len);
|
||||
@ -339,7 +355,7 @@ static char **split(char *strings, unsigned int len, unsigned int *num)
|
||||
char *p, **ret;
|
||||
|
||||
/* Count the strings. */
|
||||
*num = count_strings(strings, len);
|
||||
*num = count_strings(strings, len) + 1;
|
||||
|
||||
/* Transfer to one big alloc for easy freeing. */
|
||||
ret = kmalloc(*num * sizeof(char *) + len, GFP_KERNEL);
|
||||
@ -354,6 +370,8 @@ static char **split(char *strings, unsigned int len, unsigned int *num)
|
||||
for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
|
||||
ret[(*num)++] = p;
|
||||
|
||||
ret[*num] = strings + len;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -361,7 +379,7 @@ char **xenbus_directory(struct xenbus_transaction t,
|
||||
const char *dir, const char *node, unsigned int *num)
|
||||
{
|
||||
char *strings, *path;
|
||||
unsigned int len;
|
||||
unsigned int len = 0;
|
||||
|
||||
path = join(dir, node);
|
||||
if (IS_ERR(path))
|
||||
@ -405,6 +423,7 @@ void *xenbus_read(struct xenbus_transaction t,
|
||||
if (IS_ERR(path))
|
||||
return (void *)path;
|
||||
|
||||
printf("xs_read ");
|
||||
ret = xs_single(t, XS_READ, path, len);
|
||||
kfree(path);
|
||||
return ret;
|
||||
@ -430,6 +449,7 @@ int xenbus_write(struct xenbus_transaction t,
|
||||
iovec[1].iov_base = string;
|
||||
iovec[1].iov_len = strlen(string);
|
||||
|
||||
printf("xenbus_write dir=%s val=%s ", dir, string);
|
||||
ret = xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
|
||||
kfree(path);
|
||||
return ret;
|
||||
@ -504,6 +524,7 @@ int xenbus_transaction_end(struct xenbus_transaction t, int abort)
|
||||
else
|
||||
strcpy(abortstr, "T");
|
||||
|
||||
printf("xenbus_transaction_end ");
|
||||
err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
|
||||
|
||||
up_read(&xs_state.suspend_mutex);
|
||||
@ -641,18 +662,18 @@ int register_xenbus_watch(struct xenbus_watch *watch)
|
||||
|
||||
down_read(&xs_state.suspend_mutex);
|
||||
|
||||
spin_lock(&watches_lock);
|
||||
mtx_lock(&watches_lock);
|
||||
BUG_ON(find_watch(token) != NULL);
|
||||
LIST_INSERT_HEAD(&watches, watch, list);
|
||||
spin_unlock(&watches_lock);
|
||||
mtx_unlock(&watches_lock);
|
||||
|
||||
err = xs_watch(watch->node, token);
|
||||
|
||||
/* Ignore errors due to multiple registration. */
|
||||
if ((err != 0) && (err != -EEXIST)) {
|
||||
spin_lock(&watches_lock);
|
||||
mtx_lock(&watches_lock);
|
||||
LIST_REMOVE(watch, list);
|
||||
spin_unlock(&watches_lock);
|
||||
mtx_unlock(&watches_lock);
|
||||
}
|
||||
|
||||
up_read(&xs_state.suspend_mutex);
|
||||
@ -671,10 +692,10 @@ void unregister_xenbus_watch(struct xenbus_watch *watch)
|
||||
|
||||
down_read(&xs_state.suspend_mutex);
|
||||
|
||||
spin_lock(&watches_lock);
|
||||
mtx_lock(&watches_lock);
|
||||
BUG_ON(!find_watch(token));
|
||||
LIST_REMOVE(watch, list);
|
||||
spin_unlock(&watches_lock);
|
||||
mtx_unlock(&watches_lock);
|
||||
|
||||
err = xs_unwatch(watch->node, token);
|
||||
if (err)
|
||||
@ -684,7 +705,7 @@ void unregister_xenbus_watch(struct xenbus_watch *watch)
|
||||
up_read(&xs_state.suspend_mutex);
|
||||
|
||||
/* Cancel pending watch events. */
|
||||
spin_lock(&watch_events_lock);
|
||||
mtx_lock(&watch_events_lock);
|
||||
TAILQ_FOREACH_SAFE(msg, &watch_events, list, tmp) {
|
||||
if (msg->u.watch.handle != watch)
|
||||
continue;
|
||||
@ -692,7 +713,7 @@ void unregister_xenbus_watch(struct xenbus_watch *watch)
|
||||
kfree(msg->u.watch.vec);
|
||||
kfree(msg);
|
||||
}
|
||||
spin_unlock(&watch_events_lock);
|
||||
mtx_unlock(&watch_events_lock);
|
||||
|
||||
/* Flush any currently-executing callback, unless we are it. :-) */
|
||||
if (curproc->p_pid != xenwatch_pid) {
|
||||
@ -728,17 +749,19 @@ static void xenwatch_thread(void *unused)
|
||||
{
|
||||
struct xs_stored_msg *msg;
|
||||
|
||||
xenwatch_running = 1;
|
||||
for (;;) {
|
||||
wait_event_interruptible(&watch_events_waitq,
|
||||
!list_empty(&watch_events));
|
||||
|
||||
while (list_empty(&watch_events))
|
||||
pause("xenwatch", hz/10);
|
||||
|
||||
sx_xlock(&xenwatch_mutex);
|
||||
|
||||
spin_lock(&watch_events_lock);
|
||||
mtx_lock(&watch_events_lock);
|
||||
msg = TAILQ_FIRST(&watch_events);
|
||||
if (msg)
|
||||
list_del(&watch_events, msg);
|
||||
spin_unlock(&watch_events_lock);
|
||||
mtx_unlock(&watch_events_lock);
|
||||
|
||||
if (msg != NULL) {
|
||||
|
||||
@ -754,7 +777,7 @@ static void xenwatch_thread(void *unused)
|
||||
}
|
||||
}
|
||||
|
||||
int xs_process_msg(void)
|
||||
static int xs_process_msg(enum xsd_sockmsg_type *type)
|
||||
{
|
||||
struct xs_stored_msg *msg;
|
||||
char *body;
|
||||
@ -783,7 +806,8 @@ int xs_process_msg(void)
|
||||
return err;
|
||||
}
|
||||
body[msg->hdr.len] = '\0';
|
||||
|
||||
|
||||
*type = msg->hdr.type;
|
||||
if (msg->hdr.type == XS_WATCH_EVENT) {
|
||||
msg->u.watch.vec = split(body, msg->hdr.len,
|
||||
&msg->u.watch.vec_size);
|
||||
@ -792,26 +816,31 @@ int xs_process_msg(void)
|
||||
return PTR_ERR(msg->u.watch.vec);
|
||||
}
|
||||
|
||||
spin_lock(&watches_lock);
|
||||
mtx_lock(&watches_lock);
|
||||
msg->u.watch.handle = find_watch(
|
||||
msg->u.watch.vec[XS_WATCH_TOKEN]);
|
||||
if (msg->u.watch.handle != NULL) {
|
||||
spin_lock(&watch_events_lock);
|
||||
mtx_lock(&watch_events_lock);
|
||||
TAILQ_INSERT_TAIL(&watch_events, msg, list);
|
||||
wakeup(&watch_events_waitq);
|
||||
spin_unlock(&watch_events_lock);
|
||||
if (xenwatch_running)
|
||||
wakeup(&watch_events_waitq);
|
||||
mtx_unlock(&watch_events_lock);
|
||||
} else {
|
||||
kfree(msg->u.watch.vec);
|
||||
kfree(msg);
|
||||
}
|
||||
spin_unlock(&watches_lock);
|
||||
mtx_unlock(&watches_lock);
|
||||
} else {
|
||||
printf("event=%d ", *type);
|
||||
msg->u.reply.body = body;
|
||||
spin_lock(&xs_state.reply_lock);
|
||||
mtx_lock(&xs_state.reply_lock);
|
||||
TAILQ_INSERT_TAIL(&xs_state.reply_list, msg, list);
|
||||
spin_unlock(&xs_state.reply_lock);
|
||||
wakeup(&xs_state.reply_waitq);
|
||||
mtx_unlock(&xs_state.reply_lock);
|
||||
if (xenbus_running)
|
||||
wakeup(&xs_state.reply_waitq);
|
||||
}
|
||||
if (*type == XS_WATCH_EVENT)
|
||||
printf("\n");
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -819,12 +848,17 @@ int xs_process_msg(void)
|
||||
static void xenbus_thread(void *unused)
|
||||
{
|
||||
int err;
|
||||
enum xsd_sockmsg_type type;
|
||||
|
||||
xenbus_running = 1;
|
||||
pause("xenbus", hz/10);
|
||||
|
||||
for (;;) {
|
||||
err = xs_process_msg();
|
||||
if (err)
|
||||
err = xs_process_msg(&type);
|
||||
if (err)
|
||||
printf("XENBUS error %d while reading "
|
||||
"message\n", err);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -835,10 +869,13 @@ int xs_init(void)
|
||||
|
||||
TAILQ_INIT(&xs_state.reply_list);
|
||||
TAILQ_INIT(&watch_events);
|
||||
mtx_init(&xs_state.reply_lock, "state reply", NULL, MTX_DEF);
|
||||
sema_init(&xs_state.suspend_mutex, 1, "xenstore suspend");
|
||||
sx_init(&xenwatch_mutex, "xenwatch");
|
||||
|
||||
|
||||
mtx_init(&xs_state.reply_lock, "state reply", NULL, MTX_DEF);
|
||||
sx_init(&xs_state.request_mutex, "xenstore request");
|
||||
sema_init(&xs_state.suspend_mutex, 1, "xenstore suspend");
|
||||
|
||||
|
||||
#if 0
|
||||
mtx_init(&xs_state.suspend_mutex, "xenstore suspend", NULL, MTX_DEF);
|
||||
|
Loading…
x
Reference in New Issue
Block a user