New control socket design now works. Now working on finishing up the reporting.

This commit is contained in:
sethdelliott 2010-06-23 19:13:37 +00:00
parent ba2672a209
commit 465b565c50
7 changed files with 390 additions and 652 deletions

View File

@ -65,6 +65,7 @@ struct iperf_stream
int local_port;
int remote_port;
int socket;
int id;
/* XXX: is settings just a pointer to the same struct in iperf_test? if not,
should it be? */
struct iperf_settings *settings; /* pointer to structure settings */
@ -118,6 +119,7 @@ struct iperf_test
/* boolen variables for Options */
int reverse; /* -R option */
int daemon; /* -D option */
int no_delay; /* -N option */
int print_mss; /* -m option */
@ -171,6 +173,7 @@ struct param_exchange
int protocol;
int stream_id;
int num_streams;
int reverse;
int blksize;
int recv_window;
int send_window;
@ -205,6 +208,8 @@ enum
PARAM_EXCHANGE = 10,
PARAM_EXCHANGE_ACK = 11,
CREATE_STREAMS = 12,
SERVER_TERMINATE = 13,
CLIENT_TERMINATE = 14,
ACCESS_DENIED = -1,
};

View File

@ -39,7 +39,7 @@
#include "uuid.h"
#include "locale.h"
jmp_buf env; /* to handle longjmp on signal */
jmp_buf env; /* to handle longjmp on signal */
/*************************************************************/
@ -77,6 +77,146 @@ all_data_sent(struct iperf_test * test)
}
int
iperf_send(struct iperf_test *test)
{
int result;
char *prot;
fd_set temp_write_set;
struct timeval tv;
int64_t delayus, adjustus, dtargus;
struct iperf_stream *sp;
static struct timer *timer, *stats_interval, *reporter_interval;
if (test->state == TEST_START) {
timer = stats_interval = reporter_interval = NULL;
// Not sure what the following code does yet. It's UDP, so I'll get to fixing it eventually
if (test->protocol == Pudp) {
prot = "UDP";
dtargus = (int64_t) (test->default_settings->blksize) * SEC_TO_US * 8;
dtargus /= test->default_settings->rate;
assert(dtargus != 0);
delayus = dtargus;
adjustus = 0;
printf("iperf_run_client: adjustus: %lld, delayus %lld \n", adjustus, delayus);
for (sp = test->streams; sp != NULL; sp = sp->next)
sp->send_timer = new_timer(0, dtargus);
} else {
prot = "TCP";
}
/* if -n specified, set zero timer */
// Set timers and print usage message
if (test->default_settings->bytes == 0) {
timer = new_timer(test->duration, 0);
printf("Starting Test: protocol: %s, %d streams, %d byte blocks, %d second test \n",
prot, test->num_streams, test->default_settings->blksize, test->duration);
} else {
timer = new_timer(0, 0);
printf("Starting Test: protocol: %s, %d streams, %d byte blocks, %d bytes to send\n",
prot, test->num_streams, test->default_settings->blksize, (int) test->default_settings->bytes);
}
if (test->stats_interval != 0)
stats_interval = new_timer(test->stats_interval, 0);
if (test->reporter_interval != 0)
reporter_interval = new_timer(test->reporter_interval, 0);
test->state = TEST_RUNNING;
if (write(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
perror("write TEST_RUNNING");
exit(1);
}
} else if (test->state == TEST_RUNNING) {
FD_COPY(&test->write_set, &temp_write_set);
tv.tv_sec = 15;
tv.tv_usec = 0;
result = select(test->max_fd + 1, NULL, &temp_write_set, NULL, &tv);
if (result < 0 && errno != EINTR) {
perror("select iperf_send");
return -1;
}
if (result > 0) {
if (!all_data_sent(test) && !timer->expired(timer)) {
for (sp = test->streams; sp != NULL; sp = sp->next) {
if (FD_ISSET(sp->socket, &temp_write_set)) {
if (sp->snd(sp) < 0) {
// XXX: Do better error handling
perror("iperf_client_start: snd");
}
FD_CLR(sp->socket, &temp_write_set);
}
}
// Perform callbacks
if ((test->stats_interval != 0) && stats_interval->expired(stats_interval)) {
test->stats_callback(test);
update_timer(stats_interval, test->stats_interval, 0);
}
if ((test->reporter_interval != 0) && reporter_interval->expired(reporter_interval)) {
test->reporter_callback(test);
update_timer(reporter_interval, test->reporter_interval, 0);
}
} else {
free(timer);
free(stats_interval);
free(reporter_interval);
// Send TEST_DONE (ALL_STREAMS_END) message
test->state = ALL_STREAMS_END;
if (write(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
perror("write ALL_STREAMS_END");
return -1;
}
}
}
}
return 0;
}
int
iperf_recv(struct iperf_test *test)
{
int result;
fd_set temp_read_set;
struct timeval tv;
struct iperf_stream *sp;
if (test->state == TEST_RUNNING) {
FD_COPY(&test->read_set, &temp_read_set);
tv.tv_sec = 15;
tv.tv_usec = 0;
result = select(test->max_fd + 1, &temp_read_set, NULL, NULL, &tv);
if (result < 0) {
perror("select iperf_recv");
return -1;
}
if (result > 0) {
for (sp = test->streams; sp != NULL; sp = sp->next) {
if (FD_ISSET(sp->socket, &temp_read_set)) {
if (sp->rcv(sp) < 0) {
// XXX: Do better error handling
perror("sp->rcv(sp)");
}
FD_CLR(sp->socket, &temp_read_set);
}
}
}
}
return 0;
}
/*********************************************************/
/**
@ -100,6 +240,7 @@ iperf_exchange_parameters(struct iperf_test * test)
param.state = PARAM_EXCHANGE;
param.protocol = test->protocol;
param.num_streams = test->num_streams;
param.reverse = test->reverse;
param.blksize = test->default_settings->blksize;
param.recv_window = test->default_settings->socket_bufsize;
param.send_window = test->default_settings->socket_bufsize;
@ -110,14 +251,6 @@ iperf_exchange_parameters(struct iperf_test * test)
return -1;
}
// This code needs to be moved to the server rejection part of the server code
/*
if (result > 0 && sp->buffer[0] == ACCESS_DENIED) {
fprintf(stderr, "Busy server Detected. Try again later. Exiting.\n");
return -1;
}
*/
} else {
if (read(test->ctrl_sck, &param, sizeof(struct param_exchange)) < 0) {
@ -129,12 +262,16 @@ iperf_exchange_parameters(struct iperf_test * test)
strncpy(test->default_settings->cookie, param.cookie, COOKIE_SIZE);
test->protocol = param.protocol;
test->num_streams = param.num_streams;
test->reverse = param.reverse;
test->default_settings->blksize = param.blksize;
test->default_settings->socket_bufsize = param.recv_window;
// need to add support for send_window
// XXX: need to add support for send_window
test->default_settings->unit_format = param.format;
test->prot_listener = netannounce(test->protocol, NULL, 5202);
FD_SET(test->prot_listener, &test->read_set);
FD_SET(test->prot_listener, &test->write_set);
test->max_fd = (test->prot_listener > test->max_fd) ? test->prot_listener : test->max_fd;
// Send the control message to create streams and start the test
test->state = CREATE_STREAMS;
@ -142,6 +279,7 @@ iperf_exchange_parameters(struct iperf_test * test)
perror("write CREATE_STREAMS");
return -1;
}
}
return 0;
@ -162,15 +300,12 @@ add_to_interval_list(struct iperf_stream_result * rp, struct iperf_interval_resu
memcpy(ip, new, sizeof(struct iperf_interval_results));
ip->next = NULL;
if (rp->interval_results == NULL) /* if 1st interval */
{
rp->interval_results = ip;
rp->last_interval_results = ip; /* pointer to last element in list */
} else
{
/* add to end of list */
rp->last_interval_results->next = ip;
rp->last_interval_results = ip;
if (rp->interval_results == NULL) { /* if 1st interval */
rp->interval_results = ip;
rp->last_interval_results = ip; /* pointer to last element in list */
} else { /* add to end of list */
rp->last_interval_results->next = ip;
rp->last_interval_results = ip;
}
}
@ -185,13 +320,12 @@ display_interval_list(struct iperf_stream_result * rp, int tflag)
n = rp->interval_results;
printf("----------------------------------------\n");
while (n!=NULL)
{
gb = (float)n->bytes_transferred / (1024. * 1024. * 1024.);
printf("Interval = %f\tGBytes transferred = %.3f\n", n->interval_duration, gb);
if (tflag)
print_tcpinfo(n);
n = n->next;
while (n != NULL) {
gb = (float) n->bytes_transferred / (1024. * 1024. * 1024.);
printf("Interval = %f\tGBytes transferred = %.3f\n", n->interval_duration, gb);
if (tflag)
print_tcpinfo(n);
n = n->next;
}
}
@ -204,10 +338,10 @@ display_interval_list(struct iperf_stream_result * rp, int tflag)
void
receive_result_from_server(struct iperf_test * test)
{
int result;
int result;
struct iperf_stream *sp;
int size = 0;
char *buf = NULL;
int size = 0;
char *buf = NULL;
printf("in receive_result_from_server \n");
sp = test->streams;
@ -226,9 +360,8 @@ receive_result_from_server(struct iperf_test * test)
/* receive from server */
printf("reading results (size=%d) back from server \n", size);
do
{
result = recv(sp->socket, buf, size, 0);
do {
result = recv(sp->socket, buf, size, 0);
} while (result == -1 && errno == EINTR);
printf("Got size of results from server: %d \n", result);
@ -249,15 +382,15 @@ receive_result_from_server(struct iperf_test * test)
void
connect_msg(struct iperf_stream * sp)
{
char ipl[512], ipr[512];
char ipl[512], ipr[512];
inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) & sp->local_addr)->sin_addr), (void *) ipl, sizeof(ipl));
inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) & sp->remote_addr)->sin_addr), (void *) ipr, sizeof(ipr));
printf("[%3d] local %s port %d connected to %s port %d\n",
sp->socket,
ipl, ntohs(((struct sockaddr_in *) & sp->local_addr)->sin_port),
ipr, ntohs(((struct sockaddr_in *) & sp->remote_addr)->sin_port));
sp->socket,
ipl, ntohs(((struct sockaddr_in *) & sp->local_addr)->sin_port),
ipr, ntohs(((struct sockaddr_in *) & sp->remote_addr)->sin_port));
}
/*************************************************************/
@ -270,21 +403,22 @@ connect_msg(struct iperf_stream * sp)
void
Display(struct iperf_test * test)
{
int count = 1;
struct iperf_stream *n;
n = test->streams;
int count = 1;
printf("===============DISPLAY==================\n");
while (n != NULL)
{
if (test->role == 'c')
printf("position-%d\tsp=%d\tsocket=%d\tMbytes sent=%u\n", count++, (int) n, n->socket, (uint) (n->result->bytes_sent / (float)MB));
else
printf("position-%d\tsp=%d\tsocket=%d\tMbytes received=%u\n", count++, (int) n, n->socket, (uint) (n->result->bytes_received / (float)MB));
n = n->next;
while (n != NULL) {
if (test->role == 'c') {
printf("position-%d\tsp=%d\tsocket=%d\tMbytes sent=%u\n",
count++, (int) n, n->socket, (uint) (n->result->bytes_sent / (float)MB));
} else {
printf("position-%d\tsp=%d\tsocket=%d\tMbytes received=%u\n",
count++, (int) n, n->socket, (uint) (n->result->bytes_received / (float)MB));
}
n = n->next;
}
printf("=================END====================\n");
fflush(stdout);
@ -297,12 +431,10 @@ iperf_new_test()
{
struct iperf_test *testp;
//printf("in iperf_new_test: reinit default settings \n");
testp = (struct iperf_test *) malloc(sizeof(struct iperf_test));
if (!testp)
{
perror("malloc");
return (NULL);
if (!testp) {
perror("malloc");
return (NULL);
}
/* initialise everything to zero */
memset(testp, 0, sizeof(struct iperf_test));
@ -322,6 +454,7 @@ iperf_defaults(struct iperf_test * testp)
testp->role = 's';
testp->duration = DURATION;
testp->server_port = PORT;
testp->ctrl_sck = -1;
testp->new_stream = iperf_new_tcp_stream;
testp->stats_callback = iperf_stats_callback;
@ -355,6 +488,7 @@ iperf_create_streams(struct iperf_test *test)
perror("netdial stream");
return -1;
}
// printf("File descriptor for stream %d: %d\n", i, s);
FD_SET(s, &test->read_set);
FD_SET(s, &test->write_set);
test->max_fd = (test->max_fd < s) ? s : test->max_fd;
@ -362,10 +496,9 @@ iperf_create_streams(struct iperf_test *test)
// XXX: This doesn't fit our API model!
sp = test->new_stream(test);
sp->socket = s;
iperf_init_stream(test, sp);
iperf_add_stream(sp, test);
iperf_init_stream(sp, test);
iperf_add_stream(test, sp);
// XXX: This line probably needs to be replaced
connect_msg(sp);
}
@ -384,6 +517,18 @@ iperf_handle_message_client(struct iperf_test *test)
case CREATE_STREAMS:
iperf_create_streams(test);
break;
case TEST_START:
break;
case TEST_RUNNING:
break;
case TEST_END:
break;
case SERVER_TERMINATE:
fprintf(stderr, "The server has terminated. Exiting...\n");
exit(1);
case ACCESS_DENIED:
fprintf(stderr, "The server is busy running a test. Try again later.\n");
exit(0);
default:
// XXX: This needs to be replaced
printf("How did you get here? test->state = %d\n", test->state);
@ -407,9 +552,13 @@ iperf_connect(struct iperf_test *test)
/* Create and connect the control channel */
test->ctrl_sck = netdial(test->protocol, test->server_hostname, test->server_port);
if (test->ctrl_sck < 0) {
return -1;
}
FD_SET(test->ctrl_sck, &test->read_set);
FD_SET(test->ctrl_sck, &test->write_set);
test->max_fd = (test->ctrl_sck > test->max_fd) ? test->ctrl_sck : test->max_fd;
/* Exchange parameters */
test->state = PARAM_EXCHANGE;
@ -422,29 +571,6 @@ iperf_connect(struct iperf_test *test)
return -1;
}
/* Create and connect the individual streams */
// This code has been moved to iperf_create_streams
/*
for (i = 0; i < test->num_streams; i++) {
s = netdial(test->protocol, test->server_hostname, test->server_port);
if (s < 0) {
// Change to new error handling mode
fprintf(stderr, "error: netdial failed\n");
exit(1);
}
FD_SET(s, &test->write_set);
test->max_fd = (test->max_fd < s) ? s : test->max_fd;
sp = test->new_stream(test);
sp->socket = s;
iperf_init_stream(sp, test);
iperf_add_stream(test, sp);
connect_msg(sp);
}
*/
return 0;
}
@ -689,36 +815,38 @@ iperf_free_stream(struct iperf_stream * sp)
/**************************************************************************/
struct iperf_stream *
iperf_new_stream(struct iperf_test * testp)
iperf_new_stream(struct iperf_test *testp)
{
int i = 0;
int i;
struct iperf_stream *sp;
//printf("in iperf_new_stream \n");
sp = (struct iperf_stream *) malloc(sizeof(struct iperf_stream));
if (!sp)
{
perror("malloc");
return (NULL);
if (!sp) {
perror("malloc");
return (NULL);
}
memset(sp, 0, sizeof(struct iperf_stream));
//printf("iperf_new_stream: Allocating new stream buffer: size = %d \n", testp->default_settings->blksize);
sp->buffer = (char *) malloc(testp->default_settings->blksize);
sp->settings = (struct iperf_settings *) malloc(sizeof(struct iperf_settings));
/* make a per stream copy of default_settings in each stream structure */
memcpy(sp->settings, testp->default_settings, sizeof(struct iperf_settings));
sp->result = (struct iperf_stream_result *) malloc(sizeof(struct iperf_stream_result));
/* Make a per stream copy of default_settings in each stream structure */
// XXX: These settings need to be moved to the test struct
memcpy(sp->settings, testp->default_settings, sizeof(struct iperf_settings));
/* fill in buffer with random stuff */
/* Randomize the buffer */
srandom(time(0));
for (i = 0; i < testp->default_settings->blksize; i++)
sp->buffer[i] = random();
for (i = 0; i < testp->default_settings->blksize; ++i)
sp->buffer[i] = random();
sp->socket = -1;
sp->packet_count = 0;
// XXX: Not entirely sure what this does
sp->stream_id = (int) sp;
/* XXX: None of this commented code is needed since everything is set to zero anyways.
sp->packet_count = 0;
sp->jitter = 0.0;
sp->prev_transit = 0.0;
sp->outoforder_packets = 0;
@ -734,6 +862,7 @@ iperf_new_stream(struct iperf_test * testp)
sp->result->bytes_received_this_interval = 0;
sp->result->bytes_sent_this_interval = 0;
gettimeofday(&sp->result->start_time, NULL);
*/
sp->settings->state = STREAM_BEGIN;
return sp;
@ -747,10 +876,10 @@ iperf_init_stream(struct iperf_stream * sp, struct iperf_test * testp)
len = sizeof(struct sockaddr_in);
if (getsockname(sp->socket, (struct sockaddr *) & sp->local_addr, &len) < 0) {
if (getsockname(sp->socket, (struct sockaddr *) &sp->local_addr, &len) < 0) {
perror("getsockname");
}
if (getpeername(sp->socket, (struct sockaddr *) & sp->remote_addr, &len) < 0) {
if (getpeername(sp->socket, (struct sockaddr *) &sp->remote_addr, &len) < 0) {
perror("getpeername");
}
if (testp->protocol == Ptcp) {
@ -761,6 +890,7 @@ iperf_init_stream(struct iperf_stream * sp, struct iperf_test * testp)
/* set TCP_NODELAY and TCP_MAXSEG if requested */
set_tcp_options(sp->socket, testp->no_delay, testp->default_settings->mss);
}
}
/**************************************************************************/
@ -785,111 +915,6 @@ iperf_add_stream(struct iperf_test * test, struct iperf_stream * sp)
/**************************************************************************/
void
catcher(int sig)
{
longjmp(env, sig);
}
/**************************************************************************/
int
iperf_client_start(struct iperf_test *test)
{
int i;
char *prot, *result_string;
int64_t delayus, adjustus, dtargus;
struct iperf_stream *sp, *np;
struct timer *timer, *stats_interval, *reporter_interval;
struct sigaction sact;
test->streams->settings->state = STREAM_BEGIN;
timer = stats_interval = reporter_interval = NULL;
// Set signal handling
sigemptyset(&sact.sa_mask);
sact.sa_flags = 0;
sact.sa_handler = catcher;
sigaction(SIGINT, &sact, NULL);
// Not sure what the following code does yet. It's UDP, so I'll get to fixing it eventually
if (test->protocol == Pudp) {
prot = "UDP";
dtargus = (int64_t) (test->default_settings->blksize) * SEC_TO_US * 8;
dtargus /= test->default_settings->rate;
assert(dtargus != 0);
delayus = dtargus;
adjustus = 0;
printf("iperf_run_client: adjustus: %lld, delayus %lld \n", adjustus, delayus);
// New method for iterating through streams
for (sp = test->streams; sp != NULL; sp = sp->next)
sp->send_timer = new_timer(0, dtargus);
} else {
prot = "TCP";
}
/* if -n specified, set zero timer */
// Set timers and print usage message
if (test->default_settings->bytes == 0) {
timer = new_timer(test->duration, 0);
printf("Starting Test: protocol: %s, %d streams, %d byte blocks, %d second test \n",
prot, test->num_streams, test->default_settings->blksize, test->duration);
} else {
timer = new_timer(0, 0);
printf("Starting Test: protocol: %s, %d streams, %d byte blocks, %d bytes to send\n",
prot, test->num_streams, test->default_settings->blksize, (int) test->default_settings->bytes);
}
if (test->stats_interval != 0)
stats_interval = new_timer(test->stats_interval, 0);
if (test->reporter_interval != 0)
reporter_interval = new_timer(test->reporter_interval, 0);
// Begin testing...
while (!all_data_sent(test) && !timer->expired(timer)) {
// Send data
for (sp = test->streams; sp != NULL; sp = sp->next) {
if (sp->snd(sp) < 0) {
perror("iperf_client_start: snd");
// XXX: needs to indicate an iperf error on stream send
}
}
// Perform callbacks
if ((test->stats_interval != 0) && stats_interval->expired(stats_interval)) {
test->stats_callback(test);
update_timer(stats_interval, test->stats_interval, 0);
}
if ((test->reporter_interval != 0) && reporter_interval->expired(reporter_interval)) {
test->reporter_callback(test);
update_timer(reporter_interval, test->reporter_interval, 0);
}
/* detecting Ctrl+C */
// Why is the setjmp inside of the while??
if (setjmp(env))
break;
}
// Free timers
free(timer);
free(stats_interval);
free(reporter_interval);
// Send TEST_DONE (ALL_STREAMS_END) message
test->state = ALL_STREAMS_END;
if (write(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
perror("write ALL_STREAMS_END");
return -1;
}
return 0;
}
int
iperf_client_end(struct iperf_test *test)
@ -946,59 +971,72 @@ iperf_client_end(struct iperf_test *test)
}
int
sig_handler(int sig)
{
longjmp(env, 1);
}
int
iperf_run_client(struct iperf_test * test)
{
int result;
char *prot;
int64_t delayus, adjustus, dtargus;
fd_set temp_read_set, temp_write_set;
struct timeval tv;
struct iperf_stream *sp;
fd_set temp_set;
int result;
struct timer *timer, *stats_interval, *reporter_interval;
/* Start the client and connect to the server */
if (iperf_connect(test) < 0) {
// set error and return
return -1;
}
signal(SIGINT, sig_handler);
if (setjmp(env)) {
fprintf(stderr, "Interrupt received. Exiting...\n");
test->state = CLIENT_TERMINATE;
if (write(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
fprintf(stderr, "Unable to send CLIENT_TERMINATE message to serer\n");
}
exit(1);
}
while (test->state != TEST_END) {
FD_COPY(&test->read_set, &temp_set);
FD_COPY(&test->read_set, &temp_read_set);
FD_COPY(&test->write_set, &temp_write_set);
tv.tv_sec = 15;
tv.tv_usec = 0;
result = select(test->max_fd + 1, &temp_set, NULL, NULL, &tv);
result = select(test->max_fd + 1, &temp_read_set, &temp_write_set, NULL, &tv);
if (result < 0 && errno != EINTR) {
perror("select");
exit(1);
} else if (result > 0) {
if (FD_ISSET(test->ctrl_sck, &temp_set)) {
if (FD_ISSET(test->ctrl_sck, &temp_read_set)) {
iperf_handle_message_client(test);
FD_CLR(test->ctrl_sck, &temp_set);
FD_CLR(test->ctrl_sck, &temp_read_set);
}
if (test->reverse) {
// Reverse mode. Client receives.
iperf_recv(test);
} else {
// Regular mode. Client sends.
iperf_send(test);
}
}
}
/* Exchange parameters with the server */
/* Moved to iperf_connect
if (iperf_exchange_parameters(test) < 0) {
// This needs to set error
return -1;
}
*/
/* Start the iperf test */
/* Moved to while above
if (iperf_client_start(test) < 0) {
return -1;
}
*/
/* End the iperf test and clean up client specific memory */
/* XXX: Commented out until fixed. Probably should be controlled by ctrl_sck
if (iperf_client_end(test) < 0) {
return -1;
}
*/
return 0;
}

View File

@ -132,8 +132,10 @@ void build_tcpinfo_message(struct iperf_interval_results *r, char *message);
void safe_strcat(char *s1, char *s2);
void print_interval_results(struct iperf_test * test, struct iperf_stream *sp);
int iperf_connect(struct iperf_test *);
int iperf_client_start(struct iperf_test *);
int iperf_client_end(struct iperf_test *);
int iperf_send(struct iperf_test *);
int iperf_recv(struct iperf_test *);
int sig_handler(int);
#endif

View File

@ -28,7 +28,6 @@
#include <sys/time.h>
#include <sys/resource.h>
#include <sched.h>
#include <signal.h>
#include <setjmp.h>
#include "iperf.h"
@ -43,55 +42,9 @@
#include "uuid.h"
#include "locale.h"
void handle_message(struct iperf_test * test, int m, struct iperf_stream * sp);
/*********************************************************************/
/**
* param_received - handles the param_Exchange part for server
* returns state on success, -1 on failure
*
*/
int
param_received(struct iperf_stream * sp, struct param_exchange * param)
{
int result;
char *buf = (char *) calloc(1, sizeof(struct param_exchange));
if (sp->settings->cookie[0] == '\0' ||
(strncmp(param->cookie, sp->settings->cookie, COOKIE_SIZE) == 0))
{
strncpy(sp->settings->cookie, param->cookie, COOKIE_SIZE);
sp->settings->blksize = param->blksize;
sp->settings->socket_bufsize = param->recv_window;
sp->settings->unit_format = param->format;
printf("Got params from client: block size = %d, recv_window = %d cookie = %s\n",
sp->settings->blksize, sp->settings->socket_bufsize, sp->settings->cookie);
param->state = TEST_START;
buf[0] = TEST_START;
} else
{
fprintf(stderr, "Connection from new client denied.\n");
printf("cookie still set to %s \n", sp->settings->cookie);
param->state = ACCESS_DENIED;
buf[0] = ACCESS_DENIED;
}
memcpy(sp->buffer, buf, sizeof(struct param_exchange));;
//printf("param_received: Sending message (%d) back to client \n", sp->buffer[0]);
result = Nwrite(sp->socket, sp->buffer, sizeof(struct param_exchange), Ptcp);
if (result < 0)
perror("param_received: Error sending param ack to client");
free(buf);
return param->state;
}
/*************************************************************/
/**
* send_result_to_client - sends result string from server to client
*/
jmp_buf env;
/* send_result_to_client - sends result string from server to client */
void
send_result_to_client(struct iperf_stream * sp)
{
@ -122,35 +75,9 @@ iperf_server_listen(struct iperf_test *test)
char ubuf[UNIT_LEN];
int x;
/*
if (test->protocol == Pudp) {
test->listener_sock_udp = netannounce(Pudp, NULL, test->server_port);
if (test->listener_sock_udp < 0) {
// Needs to set some sort of error number/message
return -1;
}
}
test->listener_sock_tcp = netannounce(Ptcp, NULL, test->server_port);
if (test->listener_sock_tcp < 0) {
// Needs to set some sort of error number/message
return -1;
}
if (test->protocol == Ptcp) {
if (set_tcp_windowsize(test->listener_sock_tcp, test->default_settings->socket_bufsize, SO_RCVBUF) < 0) {
// Needs to set some sort of error number/message
perror("unable to set TCP window");
return -1;
}
}
// make sure that accept call does not block
setnonblocking(test->listener_sock_tcp);
setnonblocking(test->listener_sock_udp);
*/
if((test->listener = netannounce(Ptcp, NULL, test->server_port)) < 0) {
// Needs to set some sort of error number/message
perror("netannounce test->listener");
return -1;
}
setnonblocking(test->listener);
@ -183,7 +110,7 @@ iperf_server_listen(struct iperf_test *test)
FD_ZERO(&test->read_set);
FD_ZERO(&test->temp_set);
FD_SET(test->listener, &test->read_set);
test->max_fd = test->ctrl_sck;
test->max_fd = (test->listener > test->max_fd) ? test->listener : test->max_fd;
return 0;
}
@ -192,38 +119,52 @@ int
iperf_accept(struct iperf_test *test)
{
int s;
int rbuf = ACCESS_DENIED;
char ipl[512], ipr[512];
socklen_t len;
struct sockaddr_in addr;
struct sockaddr_in temp1, temp2;
if (test->ctrl_sck == 0) {
if (test->ctrl_sck == -1) {
if ((s = accept(test->listener, (struct sockaddr *) &addr, &len)) < 0) {
perror("accept");
return -1;
}
FD_SET(s, &test->read_set);
FD_SET(s, &test->write_set);
test->max_fd = (s > test->max_fd) ? s : test->max_fd;
test->ctrl_sck = s;
/*
inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) & addr.local_addr)->sin_addr),
(void *) ipl, sizeof(ipl));
inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) & addr.remote_addr)->sin_addr),
(void *) ipr, sizeof(ipr));
printf(report_peer, s,
ipl, ntohs(((struct sockaddr_in *) & addr.local_addr)->sin_port),
ipr, ntohs(((struct sockaddr_in *) & addr.remote_addr)->sin_port));
*/
printf("just accepted a control connection from somebody.. need to change this message\n");
len = sizeof(struct sockaddr_in);
if (getsockname(s, (struct sockaddr *) &temp1, &len) < 0)
perror("getsockname");
if (getpeername(s, (struct sockaddr *) &temp2, &len) < 0)
perror("getpeername");
inet_ntop(AF_INET, (void *) &temp1.sin_addr, ipl, sizeof(ipl));
inet_ntop(AF_INET, (void *) &temp2.sin_addr, ipr, sizeof(ipr));
printf(report_peer, s, ipl, ntohs(temp1.sin_port), ipr, ntohs(temp2.sin_port));
return 0;
return s;
} else {
// This message needs to be sent to the client
printf("The server is busy running a test. Try again later.\n");
if ((s = accept(test->listener, (struct sockaddr *) &addr, &len)) < 0) {
perror("accept");
return -1;
}
if (write(s, &rbuf, sizeof(int)) < 0) {
perror("write");
return -1;
}
close(s);
return 0;
}
}
/**************************************************************************/
int
iperf_handle_message(struct iperf_test *test)
iperf_handle_message_server(struct iperf_test *test)
{
if (read(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
// XXX: Needs to indicate read error
@ -236,9 +177,20 @@ iperf_handle_message(struct iperf_test *test)
break;
case TEST_START:
break;
case TEST_RUNNING:
break;
case TEST_END:
break;
case ALL_STREAMS_END:
// close the streams
printf("made it to ALL_STREAMS_END!\n");
// Send TEST_END!
test->state = TEST_END;
if (write(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
perror("write TEST_END");
exit(1);
}
break;
case CLIENT_TERMINATE:
fprintf(stderr, "The client has terminated. Exiting...\n");
exit(1);
default:
// XXX: This needs to be replaced by actual error handling
@ -253,12 +205,12 @@ void
iperf_run_server(struct iperf_test *test)
{
struct timeval tv;
//struct iperf_stream *np;
struct iperf_stream *sp;
struct timer *stats_interval, *reporter_interval;
char *result_string = NULL;
int j = 0, result = 0, message = 0;
int nfd = 0;
int streams_accepted = 0;
// Open socket and listen
if (iperf_server_listen(test) < 0) {
@ -267,14 +219,24 @@ iperf_run_server(struct iperf_test *test)
exit(1);
}
test->num_streams = 0;
signal(SIGINT, sig_handler);
if (setjmp(env)) {
fprintf(stderr, "Interrupt received. Exiting...\n");
test->state = SERVER_TERMINATE;
if (test->ctrl_sck >= 0) {
if (write(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
fprintf(stderr, "Unable to send SERVER_TERMINATE message to client\n");
}
}
exit(1);
}
test->default_settings->state = TEST_RUNNING;
printf("iperf_run_server: Waiting for client connect.... \n");
while (test->state != TEST_END) {
// Copy select set and renew timers
FD_COPY(&test->read_set, &test->temp_set);
tv.tv_sec = 15;
tv.tv_usec = 0;
@ -286,236 +248,56 @@ iperf_run_server(struct iperf_test *test)
exit(1);
} else if (result > 0) {
if (FD_ISSET(test->listener, &test->temp_set)) {
test->ctrl_sck = iperf_accept(test);
if (test->ctrl_sck < 0) {
fprintf(stderr, "error: could not open control socket. exiting.\n");
if (iperf_accept(test) < 0) {
fprintf(stderr, "iperf_accept: error accepting control socket. exiting...\n");
exit(1);
} else if (test->ctrl_sck > 0) {
// Accepted! exchange parameters / setup
}
}
FD_CLR(test->listener, &test->temp_set);
}
if (FD_ISSET(test->ctrl_sck, &test->temp_set)) {
// Handle control messages
iperf_handle_message(test);
iperf_handle_message_server(test);
FD_CLR(test->ctrl_sck, &test->temp_set);
}
if (FD_ISSET(test->prot_listener, &test->temp_set)) {
// Spawn new streams
// XXX: Fix this!
iperf_tcp_accept(test);
FD_CLR(test->ctrl_sck, &test->temp_set);
}
// Iterate through the streams to see if their socket FD_ISSET
for (sp = test->streams; sp != NULL; sp = sp->next) {
if (FD_ISSET(sp->socket, &test->temp_set)) {
iperf_tcp_recv(sp);
if (test->state == CREATE_STREAMS) {
if (FD_ISSET(test->prot_listener, &test->temp_set)) {
// Spawn new streams
// XXX: This works, but should it check cookies for authenticity
// Also, currently it uses 5202 for stream connections.
// Should this be fixed to use 5201 instead?
iperf_tcp_accept(test);
++streams_accepted;
FD_CLR(test->prot_listener, &test->temp_set);
}
if (streams_accepted == test->num_streams) {
FD_CLR(test->prot_listener, &test->read_set);
close(test->prot_listener);
test->state = TEST_START;
if (write(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
perror("write TEST_START");
return -1;
}
}
}
}
}
/*
while (test->default_settings->state != TEST_END) {
memcpy(&test->temp_set, &test->read_set, sizeof(test->read_set));
tv.tv_sec = 15;
tv.tv_usec = 0;
// using select to check on multiple descriptors.
result = select(test->max_fd + 1, &test->temp_set, NULL, NULL, &tv);
if (result == 0) {
continue;
} else if (result < 0 && errno != EINTR) {
printf("Error in select(): %s, socket = %d\n", strerror(errno), test->max_fd + 1);
exit(0);
} else if (result > 0) {
if (test->protocol == Ptcp) {
// Accept a new TCP connection
if (FD_ISSET(test->ctrl_sck, &test->temp_set)) {
test->protocol = Ptcp;
// The following line needs to be moved to test initialization
test->accept = iperf_tcp_accept;
if (test->accept < 0) // .. really??!
return;
// Again, needs to be moved to test initialization
test->new_stream = iperf_new_tcp_stream;
test->accept(test);
test->default_settings->state = TEST_RUNNING;
FD_CLR(test->listener_sock_tcp, &test->temp_set);
//printf("iperf_run_server: accepted TCP connection \n");
test->num_streams++;
}
if (test->reverse) {
// Reverse mode. Server sends.
iperf_send(test);
} else {
// Accept a new UDP connection
if (FD_ISSET(test->listener_sock_udp, &test->temp_set)) {
test->protocol = Pudp;
test->accept = iperf_udp_accept;
if (test->accept < 0)
return;
test->new_stream = iperf_new_udp_stream;
test->accept(test);
test->default_settings->state = TEST_RUNNING;
FD_CLR(test->listener_sock_udp, &test->temp_set);
printf("iperf_run_server: accepted UDP connection \n");
}
}
// Process the sockets for read operation
nfd = test->max_fd + 1;
for (j = 0; j <= test->max_fd; j++) {
if (FD_ISSET(j, &test->temp_set)) {
// find the correct stream - possibly time consuming?
np = find_stream_by_socket(test, j);
message = np->rcv(np); // get data from client using receiver callback
// This code needs to be fixed to work without goto
if (message < 0)
goto done;
handle_message(test, message, np);
if (message == TEST_END)
break;
}
}
if (message == PARAM_EXCHANGE) {
// start timer at end of PARAM_EXCHANGE
if (test->stats_interval != 0)
stats_interval = new_timer(test->stats_interval, 0);
if (test->reporter_interval != 0)
reporter_interval = new_timer(test->reporter_interval, 0);
}
if ((message == STREAM_BEGIN) || (message == STREAM_RUNNING)) {
//
// XXX: is this right? Might there be cases where we want
// stats for while in another state?
//
if ((test->stats_interval != 0) && stats_interval->expired(stats_interval)) {
test->stats_callback(test);
update_timer(stats_interval, test->stats_interval, 0);
}
if ((test->reporter_interval != 0) && reporter_interval->expired(reporter_interval)) {
test->reporter_callback(test);
update_timer(reporter_interval, test->reporter_interval, 0);
}
// Regular mode. Server receives.
iperf_recv(test);
}
}
}
*/ // End of the WHILE
done:
printf("Test Complete. \n\n");
/* reset cookie when client is finished */
/* XXX: which cookie to reset, and why is it stored to 2 places? */
//memset(test->streams->settings->cookie, '\0', COOKIE_SIZE);
/* All memory for the previous run needs to be freed here */
memset(test->default_settings->cookie, '\0', COOKIE_SIZE);
return;
}
/********************************************************/
void
handle_message(struct iperf_test * test, int message, struct iperf_stream * sp)
{
char *results_string = NULL;
struct iperf_stream *tp1 = NULL;
struct iperf_stream *tp2 = NULL;
//printf("handle_message: %d \n", message);
if (message < 0)
{
printf("Error reading data from client \n");
close(sp->socket);
return;
}
sp->settings->state = message;
if (message == PARAM_EXCHANGE)
{
/* copy the received settings into test */
memcpy(test->default_settings, test->streams->settings, sizeof(struct iperf_settings));
}
if (message == ACCESS_DENIED) /* this might get set by
* PARAM_EXCHANGE */
{
/* XXX: test this! */
close(sp->socket);
FD_CLR(sp->socket, &test->read_set);
iperf_free_stream(sp);
}
if (message == STREAM_END)
{
/* get final timestamp for all streams */
tp1 = test->streams;
do
{
gettimeofday(&tp1->result->end_time, NULL);
tp1 = tp1->next;
} while (tp1 != NULL);
}
if (message == RESULT_REQUEST)
{
sp->settings->state = RESULT_RESPOND;
test->stats_callback(test);
test->reporter_callback(test);
// results_string = test->reporter_callback(test);
// sp->data = results_string;
// send_result_to_client(sp);
}
if (message == ALL_STREAMS_END)
{
printf("Client done sending data. Printing final results. \n");
/* print server results */
test->stats_callback(test);
test->reporter_callback(test);
}
if (message == TEST_END)
{
//printf("client sent TEST_END message, shuting down sockets.. \n");
/* FREE ALL STREAMS */
tp1 = test->streams;
do
{
tp2 = tp1;
//printf(" closing socket: %d \n", tp2->socket);
close(tp2->socket);
FD_CLR(tp2->socket, &test->read_set);
tp1 = tp2->next; /* get next pointer before freeing */
iperf_free_stream(tp2);
} while (tp1 != NULL);
test->default_settings->state = TEST_END;
memset(test->default_settings->cookie, '\0', COOKIE_SIZE);
}
}
/**************************************************************************/
/**
* find_stream_by_socket -- finds the stream based on socket ID
* simple sequential scan: not more effiecient, but should be fine
* for a small number of streams.
*
*returns stream
*
*/
struct iperf_stream *
find_stream_by_socket(struct iperf_test * test, int sock)
{
struct iperf_stream *n;
n = test->streams;
while (1)
{
if (n->socket == sock)
break;
if (n->next == NULL)
break;
n = n->next;
}
return n;
}

View File

@ -3,14 +3,10 @@
#include "iperf.h"
int param_received(struct iperf_stream *, struct param_exchange *);
void send_result_to_client(struct iperf_stream *);
void iperf_run_server(struct iperf_test *);
struct iperf_stream *find_stream_by_socket(struct iperf_test *, int);
int iperf_server_listen(struct iperf_test *);
#endif

View File

@ -52,85 +52,32 @@ jmp_buf env; /* to handle longjmp on signal */
int
iperf_tcp_recv(struct iperf_stream * sp)
{
int result = 0, message = 0;
int result = 0;
int size = sp->settings->blksize;
char *final_message = NULL;
errno = 0;
struct param_exchange *param = (struct param_exchange *) sp->buffer;
if (!sp->buffer) {
fprintf(stderr, "receive buffer not allocated \n");
fprintf(stderr, "iperf_tcp_recv: receive buffer not allocated\n");
return -1;
}
/* get the 1st byte: then based on that, decide how much to read */
if ((result = recv(sp->socket, &message, sizeof(int), MSG_PEEK)) != sizeof(int)) {
if (result == 0)
printf("Client Disconnected. \n");
else
perror("iperf_tcp_recv: recv error: MSG_PEEK");
return -1;
}
sp->settings->state = message;
#ifdef DEBUG
if (message != STREAM_RUNNING) // tell me about non STREAM_RUNNING messages
// for debugging
printf("iperf_tcp_recv: got message type %d \n", message);
#endif
switch (message) {
case TEST_START:
case STREAM_BEGIN:
case STREAM_RUNNING:
size = sp->settings->blksize;
#ifdef USE_RECV
/*
* NOTE: Nwrite/Nread seems to be 10-15% faster than send/recv for
* localhost on OSX. More testing needed on other OSes to be sure.
*/
do {
result = recv(sp->socket, sp->buffer, size, MSG_WAITALL);
} while (result == -1 && errno == EINTR);
do {
result = recv(sp->socket, sp->buffer, size, MSG_WAITALL);
} while (result == -1 && errno == EINTR);
#else
result = Nread(sp->socket, sp->buffer, size, Ptcp);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
#endif
if (result == -1) {
perror("Read error");
return -1;
}
sp->result->bytes_received += result;
sp->result->bytes_received_this_interval += result;
break;
case STREAM_END: // What's the purpose of reading this?
size = sizeof(struct param_exchange);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
break;
case ALL_STREAMS_END:
size = sizeof(struct param_exchange);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
break;
case TEST_END:
size = sizeof(struct param_exchange);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
break;
case RESULT_REQUEST:
/* XXX: not working yet */
//final_message = iperf_reporter_callback(test);
//memcpy(sp->buffer, final_message, strlen(final_message));
//result = send(sp->socket, sp->buffer, MAX_RESULT_STRING, 0);
//if (result < 0)
// perror("Error sending results back to client");
break;
default:
printf("unexpected state encountered: %d \n", message);
return -1;
if (result == -1) {
perror("Read error");
return -1;
}
sp->result->bytes_received += result;
sp->result->bytes_received_this_interval += result;
return message;
return 0;
}
/**************************************************************************/
@ -148,41 +95,10 @@ iperf_tcp_send(struct iperf_stream * sp)
int size = sp->settings->blksize;
if (!sp->buffer) {
perror("transmit buffer not allocated");
fprintf(stderr, "iperf_tcp_send: transmit buffer not allocated\n");
return -1;
}
//printf("iperf_tcp_send: state = %d \n", sp->settings->state);
memcpy(sp->buffer, &(sp->settings->state), sizeof(int));;
/* set read size based on message type */
switch (sp->settings->state) {
case STREAM_BEGIN:
size = sp->settings->blksize;
break;
case STREAM_END:
size = sizeof(struct param_exchange);
break;
case RESULT_REQUEST:
size = MAX_RESULT_STRING;
break;
case ALL_STREAMS_END:
size = sizeof(struct param_exchange);
break;
case TEST_END:
size = sizeof(struct param_exchange);
break;
case STREAM_RUNNING:
size = sp->settings->blksize;
break;
default:
printf("State of the stream can't be determined\n");
return -1;
}
//if(sp->settings->state != STREAM_RUNNING)
// printf(" in iperf_tcp_send, message type = %d (total = %d bytes) \n", sp->settings->state, size);
#ifdef USE_SEND
result = send(sp->socket, sp->buffer, size, 0);
#else
@ -190,18 +106,9 @@ iperf_tcp_send(struct iperf_stream * sp)
#endif
if (result < 0)
perror("Write error");
//printf(" iperf_tcp_send: %d bytes sent \n", result);
if (sp->settings->state == STREAM_BEGIN || sp->settings->state == STREAM_RUNNING) {
sp->result->bytes_sent += result;
sp->result->bytes_sent_this_interval += result;
}
//printf("iperf_tcp_send: number bytes sent so far = %u \n", (uint64_t) sp->result->bytes_sent);
/* change state after 1st send */
if (sp->settings->state == STREAM_BEGIN)
sp->settings->state = STREAM_RUNNING;
sp->result->bytes_sent += result;
sp->result->bytes_sent_this_interval += result;
return result;
}
@ -213,10 +120,9 @@ iperf_new_tcp_stream(struct iperf_test * testp)
struct iperf_stream *sp;
sp = (struct iperf_stream *) iperf_new_stream(testp);
if (!sp)
{
perror("malloc");
return (NULL);
if (!sp) {
perror("malloc");
return (NULL);
}
sp->rcv = iperf_tcp_recv; /* pointer to receive function */
sp->snd = iperf_tcp_send; /* pointer to send function */
@ -242,28 +148,29 @@ iperf_tcp_accept(struct iperf_test * test)
{
socklen_t len;
struct sockaddr_in addr;
int peersock;
int peersock;
struct iperf_stream *sp;
len = sizeof(addr);
peersock = accept(test->prot_listener, (struct sockaddr *) & addr, &len);
if (peersock < 0) {
// XXX: Needs to implement better error handling
printf("Error in accept(): %s\n", strerror(errno));
return -1;
}
sp = test->new_stream(test);
setnonblocking(peersock);
FD_SET(peersock, &test->read_set); /* add new socket to master set */
test->max_fd = (test->max_fd < peersock) ? peersock : test->max_fd;
sp = test->new_stream(test);
sp->socket = peersock;
iperf_init_stream(sp, test);
iperf_add_stream(test, sp);
if (test->default_settings->state != RESULT_REQUEST)
connect_msg(sp); /* print connect message */
FD_SET(peersock, &test->read_set); /* add new socket to master set */
FD_SET(peersock, &test->write_set);
test->max_fd = (test->max_fd < peersock) ? peersock : test->max_fd;
connect_msg(sp); /* print connect message */
return 0;
}

View File

@ -56,6 +56,7 @@ static struct option longopts[] =
{"help", no_argument, NULL, 'h'},
{"daemon", no_argument, NULL, 'D'},
{"format", required_argument, NULL, 'f'},
{"reverse", no_argument, NULL, 'R'},
/* The following ifdef needs to be split up. linux-congestion is not necessarily supported
* by systems that support tos.
@ -111,7 +112,7 @@ main(int argc, char **argv)
test = iperf_new_test();
iperf_defaults(test); /* sets defaults */
while ((ch = getopt_long(argc, argv, "c:p:st:uP:b:l:w:i:n:mNTvhVdM:f:", longopts, NULL)) != -1) {
while ((ch = getopt_long(argc, argv, "c:p:st:uP:b:l:w:i:n:mRNTvhVdM:f:", longopts, NULL)) != -1) {
switch (ch) {
case 'c':
test->role = 'c';
@ -214,6 +215,9 @@ main(int argc, char **argv)
case 'd':
test->debug = 1;
break;
case 'R':
test->reverse = 1;
break;
case 'v': // print version and exit
fprintf( stderr, version );
exit(1);
@ -232,7 +236,11 @@ main(int argc, char **argv)
return 0;
}
iperf_run(test);
if (iperf_run(test) < 0) {
fprintf(stderr, "An error occurred. Exiting...\n");
return -1;
}
iperf_free_test(test);
printf("\niperf Done.\n");