From a3281a3dc60e3e883655489e78bb30dd7c4cc74a Mon Sep 17 00:00:00 2001 From: Brian Tierney Date: Fri, 6 Nov 2009 07:25:10 +0000 Subject: [PATCH] parallel streams now mostly working. Still some issues with repeated connections --- src/iperf.h | 3 +- src/iperf_api.c | 9 ++- src/iperf_api.h | 8 -- src/iperf_server_api.c | 173 ++++++++++++++++++++++++----------------- src/iperf_server_api.h | 2 +- src/iperf_tcp.c | 1 + src/iperf_udp.c | 3 +- src/main.c | 3 +- src/net.c | 2 +- 9 files changed, 115 insertions(+), 89 deletions(-) diff --git a/src/iperf.h b/src/iperf.h index 7b4f0f9..a83fe66 100644 --- a/src/iperf.h +++ b/src/iperf.h @@ -58,7 +58,8 @@ struct iperf_stream /* configurable members */ int local_port; int remote_port; - /* XXX: I think settings should be one per test, not one per stream -blt */ + /* XXX: is this just a pointer to the same struct in iperf_test? if not, + should it be? */ struct iperf_settings *settings; /* pointer to structure settings */ /* non configurable members */ diff --git a/src/iperf_api.c b/src/iperf_api.c index c58231b..4bc1d21 100644 --- a/src/iperf_api.c +++ b/src/iperf_api.c @@ -449,7 +449,7 @@ iperf_stats_callback(struct iperf_test *test) struct iperf_stream_result *rp = test->streams->result; struct iperf_interval_results *ip, temp; - printf("in stats_callback: num_streams = %d \n", test->num_streams); + //printf("in stats_callback: num_streams = %d \n", test->num_streams); for (i = 0; i < test->num_streams; i++) { rp = sp->result; @@ -527,7 +527,7 @@ iperf_reporter_callback(struct iperf_test *test) sp = test->streams; curr_state = sp->settings->state; - printf("in iperf_reporter_callback: state = %d \n", curr_state); + //printf("in iperf_reporter_callback: state = %d \n", curr_state); if (curr_state == TEST_RUNNING) { /* print interval results */ @@ -700,7 +700,7 @@ iperf_new_stream(struct iperf_test *testp) int i = 0; struct iperf_stream *sp; - printf("in iperf_new_stream \n"); + //printf("in iperf_new_stream \n"); sp = (struct iperf_stream *)malloc(sizeof(struct iperf_stream)); if (!sp) { perror("malloc"); @@ -708,9 +708,10 @@ iperf_new_stream(struct iperf_test *testp) } memset(sp, 0, sizeof(struct iperf_stream)); - printf("Allocating new stream buffer: size = %d \n", testp->default_settings->blksize); + printf("iperf_new_stream: Allocating new stream buffer: size = %d \n", testp->default_settings->blksize); sp->buffer = (char *)malloc(testp->default_settings->blksize); sp->settings = (struct iperf_settings *)malloc(sizeof(struct iperf_settings)); + /* make a per stream copy of default_settings in each stream structure */ memcpy(sp->settings, testp->default_settings, sizeof(struct iperf_settings)); sp->result = (struct iperf_stream_result *)malloc(sizeof(struct iperf_stream_result)); diff --git a/src/iperf_api.h b/src/iperf_api.h index 13615a3..f430a17 100644 --- a/src/iperf_api.h +++ b/src/iperf_api.h @@ -16,14 +16,6 @@ */ void exchange_parameters(struct iperf_test * test); -/** - * param_received - handles the param_Exchange part for server - * returns state on success, -1 on failure - * - */ -int param_received(struct iperf_stream * sp, struct param_exchange * param); - - /** * add_interval_list -- adds new interval to the interval_list * diff --git a/src/iperf_server_api.c b/src/iperf_server_api.c index 29848f6..bffdcaa 100644 --- a/src/iperf_server_api.c +++ b/src/iperf_server_api.c @@ -43,6 +43,8 @@ #include "uuid.h" #include "locale.h" +void handle_message(struct iperf_test * test, int m, struct iperf_stream * sp); + /*********************************************************************/ /** * param_received - handles the param_Exchange part for server @@ -119,9 +121,9 @@ void iperf_run_server(struct iperf_test * test) { struct timeval tv; - struct iperf_stream *np, *sp; - int j, result=0, message, cnt=0; - char *results_string = NULL; + struct iperf_stream *np; + int j = 0, result = 0, message = 0; + int nfd = 0; printf("in iperf_run_server \n"); @@ -161,6 +163,8 @@ iperf_run_server(struct iperf_test * test) { test->protocol = Ptcp; test->accept = iperf_tcp_accept; + if (test->accept < 0) + return; test->new_stream = iperf_new_tcp_stream; test->accept(test); test->default_settings->state = TEST_RUNNING; @@ -172,91 +176,118 @@ iperf_run_server(struct iperf_test * test) { test->protocol = Pudp; test->accept = iperf_udp_accept; + if (test->accept < 0) + return; test->new_stream = iperf_new_udp_stream; test->accept(test); test->default_settings->state = TEST_RUNNING; FD_CLR(test->listener_sock_udp, &test->temp_set); } /* Process the sockets for read operation */ - /* XXX: Need to try to read equal amounts from each socket, so keep - track of last socket read from, and always start with the next - socket - */ - for (j = 0; j < test->max_fd + 1; j++) + /* + * XXX: Need to try to read equal amounts from each socket, so + * keep track of last socket read from, and always start with the + * next socket + */ + nfd = test->max_fd + 1; + for (j = 0; j <= test->max_fd; j++) { + //printf("Checking socket %d \n", j); if (FD_ISSET(j, &test->temp_set)) { + //printf("iperf_run_server: data ready on socket %d \n", j); /* find the correct stream - possibly time consuming */ np = find_stream_by_socket(test, j); message = np->rcv(np); /* get data from client using * receiver callback */ - //printf("iperf_run_server: iperf_tcp_recv returned %d \n", message); - if (message < 0) - { - printf("Error reading data from client \n"); - close(np->socket); - return; - } - np->settings->state = message; - - if (message == PARAM_EXCHANGE) - { - /* copy the received settings into test */ - memcpy(test->default_settings, test->streams->settings, sizeof(struct iperf_settings)); - } - if (message == ACCESS_DENIED) /* this might get set by - * PARAM_EXCHANGE */ - { - /* XXX: test this! */ - close(np->socket); - FD_CLR(np->socket, &test->read_set); - iperf_free_stream(np); - } - if (message == STREAM_END) - { - gettimeofday(&np->result->end_time, NULL); - } - if (message == RESULT_REQUEST) - { - np->settings->state = RESULT_RESPOND; - results_string = test->reporter_callback(test); - np->data = results_string; - send_result_to_client(np); - } - if (message == ALL_STREAMS_END) - { - printf("client send ALL_STREAMS_END message, printing results \n"); - /* print server results */ - results_string = test->reporter_callback(test); - puts(results_string); /* send to stdio */ - } + if (message < 0) + goto done; + handle_message(test, message, np); if (message == TEST_END) - { - /* FREE ALL STREAMS */ - np = test->streams; - do - { - sp = np; - //printf(" closing socket: %d \n", sp->socket); - close(sp->socket); - FD_CLR(sp->socket, &test->read_set); - np = sp->next; /* get next pointer before - * freeing */ - iperf_free_stream(sp); - } while (np != NULL); - - test->default_settings->state = TEST_END; - /* reset cookie when client is finished */ - printf("got TEST_END message, reseting cookie \n"); - memset(test->default_settings->cookie, '\0', COOKIE_SIZE); - break; /* always break out of for loop on TEST_END message */ - } + break; /* test done, so break out of loop */ } /* end if (FD_ISSET(j, &temp_set)) */ } /* end for (j=0;...) */ - } /* end else (result>0) */ } /* end while */ - printf("Test Complete. \n\n"); - return; +done: + printf("Test Complete. \n\n"); + /* reset cookie when client is finished */ + memset(test->streams->settings->cookie, '\0', COOKIE_SIZE); + return; +} + +/********************************************************/ + +void +handle_message(struct iperf_test * test, int message, struct iperf_stream * sp) +{ + char *results_string = NULL; + struct iperf_stream *tp1 = NULL; + struct iperf_stream *tp2 = NULL; + + //printf("handle_message: %d \n", message); + if (message < 0) + { + printf("Error reading data from client \n"); + close(sp->socket); + return; + } + sp->settings->state = message; + + if (message == PARAM_EXCHANGE) + { + /* copy the received settings into test */ + memcpy(test->default_settings, test->streams->settings, sizeof(struct iperf_settings)); + } + if (message == ACCESS_DENIED) /* this might get set by + * PARAM_EXCHANGE */ + { + /* XXX: test this! */ + close(sp->socket); + FD_CLR(sp->socket, &test->read_set); + iperf_free_stream(sp); + } + if (message == STREAM_END) + { + /* get final timestamp for all streams */ + tp1 = test->streams; + do + { + gettimeofday(&tp1->result->end_time, NULL); + tp1 = tp1->next; + } while (tp1 != NULL); + } + if (message == RESULT_REQUEST) + { + sp->settings->state = RESULT_RESPOND; + results_string = test->reporter_callback(test); + sp->data = results_string; + send_result_to_client(sp); + } + if (message == ALL_STREAMS_END) + { + printf("client sent ALL_STREAMS_END message, printing results \n"); + /* print server results */ + results_string = test->reporter_callback(test); + puts(results_string); /* send to stdio */ + } + if (message == TEST_END) + { + printf("client sent TEST_END message, shuting down sockets.. \n"); + /* FREE ALL STREAMS */ + tp1 = test->streams; + do + { + tp2 = tp1; + printf(" closing socket: %d \n", tp2->socket); + close(tp2->socket); + FD_CLR(tp2->socket, &test->read_set); + tp1 = tp2->next; /* get next pointer before freeing */ + iperf_free_stream(tp2); + } while (tp1 != NULL); + + test->default_settings->state = TEST_END; + memset(test->default_settings->cookie, '\0', COOKIE_SIZE); + } } diff --git a/src/iperf_server_api.h b/src/iperf_server_api.h index 3908a7e..40d10b4 100644 --- a/src/iperf_server_api.h +++ b/src/iperf_server_api.h @@ -1,5 +1,5 @@ -int param_received(struct iperf_stream * sp, struct param_exchange * param); +int param_received(struct iperf_stream *sp, struct param_exchange * param); void send_result_to_client(struct iperf_stream * sp); void iperf_run_server(struct iperf_test * test); diff --git a/src/iperf_tcp.c b/src/iperf_tcp.c index 49fbd15..cffcca9 100644 --- a/src/iperf_tcp.c +++ b/src/iperf_tcp.c @@ -30,6 +30,7 @@ #include "iperf.h" #include "iperf_api.h" +#include "iperf_server_api.h" #include "iperf_tcp.h" #include "timer.h" #include "net.h" diff --git a/src/iperf_udp.c b/src/iperf_udp.c index e7b8c72..9330732 100644 --- a/src/iperf_udp.c +++ b/src/iperf_udp.c @@ -275,7 +275,8 @@ iperf_udp_accept(struct iperf_test * test) if (connect(test->listener_sock_udp, (struct sockaddr *) & sa_peer, len) < 0) { - perror("connect"); + perror("iperf_udp_accept: connect error"); + exit(-1); /* XXX: for debugging */ return -1; } sp = test->new_stream(test); diff --git a/src/main.c b/src/main.c index 2dc4d22..e6b1bae 100644 --- a/src/main.c +++ b/src/main.c @@ -84,8 +84,7 @@ main(int argc, char **argv) char ch, role; struct iperf_test *test; - int port = PORT, cnt = 0; - struct iperf_stream *np; + int port = PORT; #ifdef TEST_PROC_AFFINITY /* didnt seem to work.... */ diff --git a/src/net.c b/src/net.c index a28286b..5cb4ed5 100644 --- a/src/net.c +++ b/src/net.c @@ -43,7 +43,7 @@ netdial(int proto, char *client, int port) if (connect(s, (struct sockaddr *) & sa, sizeof sa) < 0 && errno != EINPROGRESS) { - perror("connect error"); + perror("netdial: connect error"); return (-1); } sn = sizeof sa;