add io abstraction layer

This commit is contained in:
oscar 2023-01-17 16:27:40 -05:00
parent 5d1fae3c4a
commit e574269457
6 changed files with 79 additions and 23 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@
# Prerequisites
*.d
build
.cache
# Object files
*.o

44
bsock.c
View File

@ -3,8 +3,43 @@
#include "bsock.h"
#include "ringbuf.h"
static ssize_t
bsock_posix_io_read(void * ctx, void *buf, size_t nbytes)
{
return read((int)(uintptr_t)ctx, buf, nbytes);
}
static ssize_t
bsock_posix_io_write(void * ctx, void *buf, size_t nbytes)
{
return write((int)(uintptr_t)ctx, buf, nbytes);
}
static ssize_t
bsock_posix_io_readv(void * ctx, const struct iovec *iovec, int nvec)
{
return readv((int)(uintptr_t)ctx, iovec, nvec);
}
static ssize_t
bsock_posix_io_writev(void * ctx, const struct iovec *iovec, int nvec)
{
return writev((int)(uintptr_t)ctx, iovec, nvec);
}
struct bsock_ringbuf_io
bsock_io_posix()
{
struct bsock_ringbuf_io ret;
ret.read = &bsock_posix_io_read;
ret.write = &bsock_posix_io_write;
ret.readv = &bsock_posix_io_readv;
ret.writev = &bsock_posix_io_writev;
return ret;
}
struct bsock *
bsock_create(int fd, size_t rbuf_sz, size_t wbuf_sz)
bsock_create(void * io_ctx, struct bsock_ringbuf_io *io, size_t rbuf_sz, size_t wbuf_sz)
{
struct bsock *bsock = malloc(sizeof(struct bsock));
if (bsock == NULL) {
@ -24,7 +59,7 @@ bsock_create(int fd, size_t rbuf_sz, size_t wbuf_sz)
}
bsock_ringbuf_init(&wbuf, buf, wbuf_sz);
bsock_init(bsock, fd, &rbuf, &wbuf);
bsock_init(bsock, io_ctx, io, &rbuf, &wbuf);
return bsock;
fail:
@ -69,14 +104,13 @@ bsock_peek(struct bsock *bsock, char *buf, size_t len)
int
bsock_flush(struct bsock *bsock)
{
return bsock_ringbuf_flush_fd(&bsock->wbuf, bsock->fd, bsock_ringbuf_size(&bsock->wbuf));
return bsock_ringbuf_flush(&bsock->wbuf, bsock->io_ctx, &bsock->io, bsock_ringbuf_size(&bsock->wbuf));
}
int
bsock_poll(struct bsock *bsock)
{
return bsock_ringbuf_poll_fd(&bsock->rbuf, bsock->fd,
bsock_ringbuf_free_size(&bsock->rbuf));
return bsock_ringbuf_poll(&bsock->rbuf, bsock->io_ctx, &bsock->io, bsock_ringbuf_free_size(&bsock->rbuf));
}
int

13
bsock.h
View File

@ -9,19 +9,23 @@ extern "C" {
#endif
struct bsock {
int fd;
struct bsock_ringbuf rbuf;
struct bsock_ringbuf wbuf;
struct bsock_ringbuf_io io;
void * io_ctx;
};
struct bsock *bsock_create(int sockfd, size_t rbuf_sz, size_t wbuf_sz);
struct bsock_ringbuf_io bsock_io_posix();
struct bsock *bsock_create(void * io_ctx, struct bsock_ringbuf_io *io, size_t rbuf_sz, size_t wbuf_sz);
void bsock_free(struct bsock *bsock);
static inline void
bsock_init(struct bsock *bsock, int fd, struct bsock_ringbuf *rbuf, struct bsock_ringbuf *wbuf)
bsock_init(struct bsock *bsock, void * io_ctx, struct bsock_ringbuf_io *io, struct bsock_ringbuf *rbuf, struct bsock_ringbuf *wbuf)
{
bsock->fd = fd;
bsock->io_ctx = io_ctx;
memcpy(&bsock->io, io, sizeof(struct bsock_ringbuf_io));
memcpy(&bsock->rbuf, rbuf, sizeof(struct bsock_ringbuf));
memcpy(&bsock->wbuf, wbuf, sizeof(struct bsock_ringbuf));
}
@ -73,6 +77,7 @@ int bsock_poll(struct bsock *sock);
*/
int bsock_flush(struct bsock *sock);
#ifdef __cplusplus
}
#endif

View File

@ -99,7 +99,7 @@ bsock_ringbuf_write(struct bsock_ringbuf *rb, char *buf, size_t len)
}
int
bsock_ringbuf_poll_fd(struct bsock_ringbuf *rb, int fd, size_t len)
bsock_ringbuf_poll(struct bsock_ringbuf *rb, void * ctx, struct bsock_ringbuf_io * io, size_t len)
{
size_t free_sz = rb->max_sz - rb->sz;
if (len > free_sz) {
@ -119,12 +119,12 @@ bsock_ringbuf_poll_fd(struct bsock_ringbuf *rb, int fd, size_t len)
iov[1].iov_base = rb->buf;
iov[1].iov_len = len - rem_size;
ret = readv(fd, iov, 2);
ret = io->readv(ctx, iov, 2);
} else {
ret = read(fd, rb->end, len);
ret = io->read(ctx, rb->end, len);
}
} else {
ret = read(fd, rb->end, len);
ret = io->read(ctx, rb->end, len);
}
if (ret > 0) {
@ -135,7 +135,7 @@ bsock_ringbuf_poll_fd(struct bsock_ringbuf *rb, int fd, size_t len)
}
int
bsock_ringbuf_flush_fd(struct bsock_ringbuf *rb, int fd, size_t len)
bsock_ringbuf_flush(struct bsock_ringbuf *rb, void * ctx, struct bsock_ringbuf_io * io, size_t len)
{
if (len > rb->sz) {
errno = EOVERFLOW;
@ -145,7 +145,7 @@ bsock_ringbuf_flush_fd(struct bsock_ringbuf *rb, int fd, size_t len)
int ret = 0;
if (rb->end > rb->start) {
// no wrapping
ret = write(fd, rb->start, len);
ret = io->write(ctx, rb->start, len);
} else {
// there is wrapping
size_t rem_size = rb->buf + rb->max_sz - rb->start;
@ -156,9 +156,9 @@ bsock_ringbuf_flush_fd(struct bsock_ringbuf *rb, int fd, size_t len)
iov[1].iov_base = rb->buf;
iov[1].iov_len = len - rem_size;
ret = writev(fd, iov, 2);
ret = io->writev(ctx, iov, 2);
} else {
ret = write(fd, rb->start, len);
ret = io->write(ctx, rb->start, len);
}
}

View File

@ -5,6 +5,17 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/uio.h>
typedef ssize_t (*bsock_ringbuf_io_fn) (void * ctx, void * buf, size_t nbytes);
typedef ssize_t (*bsock_ringbuf_iov_fn) (void * ctx, const struct iovec *iov, int iovcnt);
struct bsock_ringbuf_io {
bsock_ringbuf_io_fn read;
bsock_ringbuf_io_fn write;
bsock_ringbuf_iov_fn readv;
bsock_ringbuf_iov_fn writev;
};
struct bsock_ringbuf {
char *buf;
@ -14,6 +25,7 @@ struct bsock_ringbuf {
size_t max_sz;
};
static inline size_t
bsock_ringbuf_size(struct bsock_ringbuf *rb)
{
@ -68,11 +80,11 @@ int bsock_ringbuf_write(struct bsock_ringbuf *rb, char *buf, size_t len);
* success: # of bytes written
* failure: -1 + errno
*/
int bsock_ringbuf_poll_fd(struct bsock_ringbuf *rb, int fd, size_t len);
int bsock_ringbuf_poll(struct bsock_ringbuf *rb, void * ctx, struct bsock_ringbuf_io *io, size_t len);
/**
* returns:
* success: # of bytes written
* failure: -1 + errno
*/
int bsock_ringbuf_flush_fd(struct bsock_ringbuf *rb, int fd, size_t len);
int bsock_ringbuf_flush(struct bsock_ringbuf *rb, void * ctx, struct bsock_ringbuf_io *io, size_t len);

12
test.c
View File

@ -59,6 +59,7 @@ void
ringbuf_test(void)
{
struct bsock_ringbuf rbuf;
struct bsock_ringbuf_io io = bsock_io_posix();
char *buf = malloc(1024);
char *sbuf = malloc(1024);
for (int i = 0; i < 1024; i++) {
@ -120,9 +121,9 @@ ringbuf_test(void)
assert(pipe(pipes) == 0);
assert(write(pipes[0], sbuf, 512) == 512);
bsock_ringbuf_init(&rbuf, buf, 1024);
assert(bsock_ringbuf_poll_fd(&rbuf, pipes[1], 566) == 512);
assert(bsock_ringbuf_poll(&rbuf, (void *)(uintptr_t)pipes[1], &io, 566) == 512);
ringbuf_verify(&rbuf, buf, 512);
assert(bsock_ringbuf_flush_fd(&rbuf, pipes[1], 500) == 500);
assert(bsock_ringbuf_flush(&rbuf, (void *)(uintptr_t)pipes[1], &io, 500) == 500);
ringbuf_verify(&rbuf, buf + 500, 12);
assert(read(pipes[0], buf3, 500) == 500);
assert(memcmp(sbuf, buf3, 500) == 0);
@ -132,7 +133,7 @@ ringbuf_test(void)
assert(write(pipes[0], sbuf, 512) == 512);
bsock_ringbuf_init(&rbuf, buf, 1024);
rbuf_set(&rbuf, buf + 768, 0);
assert(bsock_ringbuf_poll_fd(&rbuf, pipes[1], 512) == 512);
assert(bsock_ringbuf_poll(&rbuf, (void *)(uintptr_t)pipes[1], &io, 512) == 512);
ringbuf_verify(&rbuf, buf + 768, 512);
assert_read(&rbuf, buf3, 512);
assert(memcmp(sbuf, buf3, 512) == 0);
@ -143,7 +144,7 @@ ringbuf_test(void)
bsock_ringbuf_init(&rbuf, buf, 1024);
rbuf_set(&rbuf, buf + 768, 0);
assert_write(&rbuf, sbuf, 512);
assert(bsock_ringbuf_flush_fd(&rbuf, pipes[1], 512) == 512);
assert(bsock_ringbuf_flush(&rbuf, (void *)(uintptr_t)pipes[1], &io, 512) == 512);
ringbuf_verify(&rbuf, buf + 256, 0);
assert(read(pipes[0], buf3, 512) == 512);
assert(memcmp(sbuf, buf3, 512) == 0);
@ -152,6 +153,9 @@ ringbuf_test(void)
int
main(void)
{
printf("Starting tests...\n");
ringbuf_test();
printf("Tests done!\n");
return 0;
}