From 982c704a8a6a5e9a4f9dc8a76f0dd05e8cdb4411 Mon Sep 17 00:00:00 2001 From: sethdelliott Date: Fri, 18 Jun 2010 21:08:50 +0000 Subject: [PATCH] NOTE: This is not a working revision. I'm in the process of redesigning how the client and server communicate. It will be fixed soon --- src/iperf.h | 53 +++--- src/iperf_api.c | 228 +++++++++++++------------ src/iperf_api.h | 11 +- src/iperf_server_api.c | 374 ++++++++++++++++++++++++++++------------- src/iperf_server_api.h | 17 +- src/iperf_tcp.c | 73 ++++---- src/main.c | 15 +- src/net.c | 72 ++++---- 8 files changed, 496 insertions(+), 347 deletions(-) diff --git a/src/iperf.h b/src/iperf.h index cb4027e..4247413 100644 --- a/src/iperf.h +++ b/src/iperf.h @@ -4,8 +4,8 @@ approvals from the U.S. Dept. of Energy). All rights reserved. */ -#ifndef IPERF_H -#define IPERF_H +#ifndef __IPERF_H +#define __IPERF_H #include #include @@ -64,13 +64,13 @@ struct iperf_stream /* configurable members */ int local_port; int remote_port; + int socket; /* XXX: is settings just a pointer to the same struct in iperf_test? if not, should it be? */ struct iperf_settings *settings; /* pointer to structure settings */ /* non configurable members */ struct iperf_stream_result *result; /* structure pointer to result */ - int socket; struct timer *send_timer; char *buffer; /* data to send */ @@ -100,28 +100,37 @@ struct iperf_stream struct iperf_test { - char role; /* c' lient or 's' erver */ + char role; /* c' lient or 's' erver */ int protocol; - char *server_hostname; /* -c option */ + int state; + char *server_hostname; /* -c option */ int server_port; - int duration; /* total duration of test (-t flag) */ + int duration; /* total duration of test (-t flag) */ + + /* The following two members should be replaced by a single TCP control socket */ int listener_sock_tcp; int listener_sock_udp; + int ctrl_sck; + // Server is the only one that needs these + int listener; + int prot_listener; + + /* boolen variables for Options */ - int daemon; /* -D option */ - int no_delay; /* -N option */ - int print_mss; /* -m option */ - int v6domain; /* -6 option */ - int output_format; /* -O option */ - int verbose; /* -V (verbose) option */ - int debug; /* debug mode */ + int daemon; /* -D option */ + int no_delay; /* -N option */ + int print_mss; /* -m option */ + int v6domain; /* -6 option */ + int output_format; /* -O option */ + int verbose; /* -V (verbose) option */ + int debug; /* debug mode */ /* Select related parameters */ int max_fd; - fd_set read_set; /* set of read sockets */ - fd_set temp_set; /* temp set for select */ - fd_set write_set; /* set of write sockets */ + fd_set read_set; /* set of read sockets */ + fd_set temp_set; /* temp set for select */ + fd_set write_set; /* set of write sockets */ int (*accept) (struct iperf_test *); struct iperf_stream *(*new_stream) (struct iperf_test *); @@ -131,9 +140,9 @@ struct iperf_test void (*stats_callback) (struct iperf_test *); void (*reporter_callback) (struct iperf_test *); - int reporter_fd; /* file descriptor for reporter */ - int num_streams; /* total streams in the test (-P) */ - int tcp_info; /* display getsockopt(TCP_INFO) results */ + int reporter_fd; /* file descriptor for reporter */ + int num_streams; /* total streams in the test (-P) */ + int tcp_info; /* display getsockopt(TCP_INFO) results. Should this be moved to Options? */ /* iperf error reporting * - errtype: (0,1,2) @@ -144,7 +153,7 @@ struct iperf_test //int errtype; //int errno; - struct iperf_stream *streams; /* pointer to list of struct stream */ + struct iperf_stream *streams; /* pointer to list of struct stream */ struct iperf_settings *default_settings; }; @@ -159,6 +168,7 @@ struct udp_datagram struct param_exchange { int state; + int protocol; int stream_id; int blksize; int recv_window; @@ -193,6 +203,7 @@ enum ALL_STREAMS_END = 9, PARAM_EXCHANGE = 10, PARAM_EXCHANGE_ACK = 11, + CREATE_STREAMS = 12, ACCESS_DENIED = -1, }; @@ -209,5 +220,5 @@ enum #define MAX_MSS 9 * 1024 #define MAX_STREAMS 128 -#endif /* IPERF_API_H */ +#endif diff --git a/src/iperf_api.c b/src/iperf_api.c index ca41d2b..bc411a0 100644 --- a/src/iperf_api.c +++ b/src/iperf_api.c @@ -87,38 +87,57 @@ all_data_sent(struct iperf_test * test) int iperf_exchange_parameters(struct iperf_test * test) { - int result; - struct iperf_stream *sp; - struct param_exchange *param; + struct param_exchange param; - sp = test->streams; - sp->settings->state = PARAM_EXCHANGE; - param = (struct param_exchange *) sp->buffer; + if (test->role == 'c') { - get_uuid(test->default_settings->cookie); - strncpy(param->cookie, test->default_settings->cookie, COOKIE_SIZE); + // XXX: Probably should get the cookie at the start of iperf rather than + // waiting till here + get_uuid(test->default_settings->cookie); + strncpy(param->cookie, test->default_settings->cookie, COOKIE_SIZE); - /* setting up exchange parameters */ - param->state = PARAM_EXCHANGE; - param->blksize = test->default_settings->blksize; - param->recv_window = test->default_settings->socket_bufsize; - param->send_window = test->default_settings->socket_bufsize; - param->format = test->default_settings->unit_format; + /* setting up exchange parameters */ + param->state = PARAM_EXCHANGE; + param->protocol = test->protocol; + param->blksize = test->default_settings->blksize; + param->recv_window = test->default_settings->socket_bufsize; + param->send_window = test->default_settings->socket_bufsize; + param->format = test->default_settings->unit_format; - if (sp->snd(sp) < 0) { - perror("Error sending exchange params to server"); - return -1; - } + if (write(test->ctrl_sck, ¶m, sizeof(struct param_exchange)) < 0) { + perror("write param_exchange"); + return -1; + } - result = Nread(sp->socket, sp->buffer, sizeof(struct param_exchange), Ptcp); - if (result < 0) { - perror("Error getting exchange params ack from server"); - return -1; - } + // This code needs to be moved to the server rejection part of the server code + /* + if (result > 0 && sp->buffer[0] == ACCESS_DENIED) { + fprintf(stderr, "Busy server Detected. Try again later. Exiting.\n"); + return -1; + } + */ - if (result > 0 && sp->buffer[0] == ACCESS_DENIED) { - fprintf(stderr, "Busy server Detected. Try again later. Exiting.\n"); - return -1; + } else { + + if (read(ctrl_sck, ¶m, sizeof(struct param_exchange)) < 0) { + perror("read param_exchange"); + return -1; + } + + // set test parameters + test->default_settings->cookie = param->cookie; + test->protocol = param->protocol; + test->default_settings->blksize = param->blksize; + test->default_settings->socket_bufsize = param->recv_window; + // need to add support for send_window + test->default_settings->unit_format = param->format; + + // Send the control message to create streams and start the test + test->state = CREATE_STREAMS; + if (write(ctrl_sck, &test->state, sizeof(int)) < 0) { + perror("write CREATE_STREAMS"); + return -1; + } } return 0; @@ -320,87 +339,53 @@ iperf_defaults(struct iperf_test * testp) /**************************************************************************/ -void -iperf_init_test(struct iperf_test * test) +int +iperf_create_streams(struct iperf_test *test) { - char ubuf[UNIT_LEN]; struct iperf_stream *sp; - int i, s = 0; + int i, s; - if (test->role == 's') - { /* server */ - if (test->protocol == Pudp) - { - test->listener_sock_udp = netannounce(Pudp, NULL, test->server_port); - if (test->listener_sock_udp < 0) - exit(0); - } - /* always create TCP connection for control messages */ - test->listener_sock_tcp = netannounce(Ptcp, NULL, test->server_port); - if (test->listener_sock_tcp < 0) - exit(0); + for (i = 0; i < test->num_streams; ++i) { + s = netdial(test->protocol, test->server_hostname, test->server_port); + if (s < 0) { + perror("netdial stream"); + return -1; + } + FD_SET(s, &test->read_set); + FD_SET(s, &test->write_set); + test->max_fd = (test->max_fd < s) ? s : test->max_fd; - if (test->protocol == Ptcp) - { - if (set_tcp_windowsize(test->listener_sock_tcp, test->default_settings->socket_bufsize, SO_RCVBUF) < 0) - perror("unable to set TCP window"); - } - /* make sure that accept call does not block */ - setnonblocking(test->listener_sock_tcp); - setnonblocking(test->listener_sock_udp); - - printf("-----------------------------------------------------------\n"); - printf("Server listening on %d\n", test->server_port); - int x; - - /* make sure we got what we asked for */ - if ((x = get_tcp_windowsize(test->listener_sock_tcp, SO_RCVBUF)) < 0) - perror("SO_RCVBUF"); - - if (test->protocol == Ptcp) - { - { - if (test->default_settings->socket_bufsize > 0) - { - unit_snprintf(ubuf, UNIT_LEN, (double) x, 'A'); - printf("TCP window size: %s\n", ubuf); - } else - { - printf("Using TCP Autotuning \n"); - } - } - } - printf("-----------------------------------------------------------\n"); + // XXX: This doesn't fit our API model! + sp = test->new_stream(test); + sp->socket = s; + iperf_init_stream(test, sp); + iperf_add_stream(sp, test); + // XXX: This line probably needs to be replaced + connect_msg(sp); } - /* This code is being removed. Commented out until removal - else if (test->role == 'c') - { // Client - FD_ZERO(&test->write_set); - FD_SET(s, &test->write_set); - // XXX: I think we need to create a TCP control socket here too for - // UDP mode -blt - for (i = 0; i < test->num_streams; i++) - { - s = netdial(test->protocol, test->server_hostname, test->server_port); - if (s < 0) - { - fprintf(stderr, "netdial failed\n"); - exit(0); - } - FD_SET(s, &test->write_set); - test->max_fd = (test->max_fd < s) ? s : test->max_fd; + return 0; +} - sp = test->new_stream(test); - sp->socket = s; - iperf_init_stream(sp, test); - iperf_add_stream(test, sp); - - connect_msg(sp); // print connection established message - } +int +iperf_handle_message_client(struct iperf_test *test) +{ + if (read(test->ctrl_sck, &test->state, sizeof(int)) < 0) { + // indicate error on read + return -1; } - */ + + switch (test->state) { + case CREATE_STREAMS: + iperf_create_streams(test); + break; + default: + printf("How did you get here? test->state = %d\n", test->state); + break; + } + + return 0; } /* iperf_connect -- client to server connection function */ @@ -412,9 +397,30 @@ iperf_connect(struct iperf_test *test) printf("Connecting to host %s, port %d\n", test->server_hostname, test->server_port); - /* For Select: Set the test->write_set select set to zero, then set the s fd */ + FD_ZERO(&test->read_set); FD_ZERO(&test->write_set); + /* Create and connect the control channel */ + test->ctrl_sck = netdial(test->protocol, test->server_hostname, test->server_port); + + FD_SET(test->ctrl_sck, &test->read_set); + FD_SET(test->ctrl_sck, &test->write_set); + + /* Exchange parameters */ + test->state = PARAM_EXCHANGE; + if (write(test->ctrl_sck, &test->state, sizeof(int)) < 0) { + perror("write PARAM_EXCHANGE"); + return -1; + } + if (iperf_exchange_parameters(test) < 0) { + fprintf(stderr, "iperf_exchange_parameters failed\n"); + return -1; + } + + + /* Create and connect the individual streams */ + // This code has been moved to iperf_create_streams +/* for (i = 0; i < test->num_streams; i++) { s = netdial(test->protocol, test->server_hostname, test->server_port); if (s < 0) { @@ -432,8 +438,9 @@ iperf_connect(struct iperf_test *test) connect_msg(sp); } +*/ - return 1; + return 0; } /**************************************************************************/ @@ -461,8 +468,7 @@ iperf_free_test(struct iperf_test * test) /** * iperf_stats_callback -- handles the statistic gathering for both the client and server * - *returns void * - * + * XXX: This function needs to be updated to reflect the new code */ @@ -508,7 +514,7 @@ iperf_stats_callback(struct iperf_test * test) * iperf_reporter_callback -- handles the report printing * *returns report - * + * XXX: This function needs to be updated to reflect the new code */ void @@ -654,15 +660,13 @@ print_interval_results(struct iperf_test * test, struct iperf_stream * sp) void safe_strcat(char *s1, char *s2) { - //printf(" adding string %s to end of string %s \n", s1, s1); - if (strlen(s1) + strlen(s2) < MAX_RESULT_STRING) - strcat(s1, s2); - else - { - printf("Error: results string too long \n"); - exit(-1); /* XXX: should return an error instead! */ - /* but code that calls this needs to check for error first */ - //return -1; + if (strlen(s1) + strlen(s2) < MAX_RESULT_STRING) { + strcat(s1, s2); + } else { + printf("Error: results string too long \n"); + exit(-1); /* XXX: should return an error instead! */ + /* but code that calls this needs to check for error first */ + //return -1; } } diff --git a/src/iperf_api.h b/src/iperf_api.h index 7d631b3..1aa6820 100644 --- a/src/iperf_api.h +++ b/src/iperf_api.h @@ -5,8 +5,8 @@ approvals from the U.S. Dept. of Energy). All rights reserved. */ -#ifndef IPERF_API_H -#define IPERF_API_H +#ifndef __IPERF_API_H +#define __IPERF_API_H #include "iperf.h" @@ -88,11 +88,6 @@ struct iperf_test *iperf_new_test(); void iperf_defaults(struct iperf_test * testp); -/** - * iperf_init_test -- perform pretest initialization (listen on sockets, etc) - * - */ -void iperf_init_test(struct iperf_test * testp); /** * iperf_free_test -- free resources used by test, calls iperf_free_stream to @@ -140,5 +135,5 @@ int iperf_connect(struct iperf_test *); int iperf_client_start(struct iperf_test *); int iperf_client_end(struct iperf_test *); -#endif /* IPERF_API_H */ +#endif diff --git a/src/iperf_server_api.c b/src/iperf_server_api.c index 7db3a19..c5e7ac8 100644 --- a/src/iperf_server_api.c +++ b/src/iperf_server_api.c @@ -43,7 +43,7 @@ #include "uuid.h" #include "locale.h" -void handle_message(struct iperf_test * test, int m, struct iperf_stream * sp); +void handle_message(struct iperf_test * test, int m, struct iperf_stream * sp); /*********************************************************************/ /** @@ -116,139 +116,277 @@ send_result_to_client(struct iperf_stream * sp) free(buf); } -/**************************************************************************/ -void -iperf_run_server(struct iperf_test * test) +int +iperf_server_listen(struct iperf_test *test) { - struct timeval tv; - struct iperf_stream *np; - struct timer *stats_interval, *reporter_interval; - char *result_string = NULL; - int j = 0, result = 0, message = 0; - int nfd = 0; + char ubuf[UNIT_LEN]; + int x; - //printf("in iperf_run_server \n"); +/* + if (test->protocol == Pudp) { + test->listener_sock_udp = netannounce(Pudp, NULL, test->server_port); + if (test->listener_sock_udp < 0) { + // Needs to set some sort of error number/message + return -1; + } + } + + test->listener_sock_tcp = netannounce(Ptcp, NULL, test->server_port); + if (test->listener_sock_tcp < 0) { + // Needs to set some sort of error number/message + return -1; + } + + if (test->protocol == Ptcp) { + if (set_tcp_windowsize(test->listener_sock_tcp, test->default_settings->socket_bufsize, SO_RCVBUF) < 0) { + // Needs to set some sort of error number/message + perror("unable to set TCP window"); + return -1; + } + } + + // make sure that accept call does not block + setnonblocking(test->listener_sock_tcp); + setnonblocking(test->listener_sock_udp); +*/ + if((test->listener = netannounce(Ptcp, NULL, test->server_port)) < 0) { + // Needs to set some sort of error number/message + return -1; + } + setnonblocking(test->listener); + + printf("-----------------------------------------------------------\n"); + printf("Server listening on %d\n", test->server_port); + + // This needs to be changed to reflect if client has different window size + // make sure we got what we asked for + if ((x = get_tcp_windowsize(test->listener_sock_tcp, SO_RCVBUF)) < 0) { + // Needs to set some sort of error number/message + perror("SO_RCVBUF"); + return -1; + } + + // This code needs to be moved to after parameter exhange + if (test->protocol == Ptcp) { + if (test->default_settings->socket_bufsize > 0) { + unit_snprintf(ubuf, UNIT_LEN, (double) x, 'A'); + printf("TCP window size: %s\n", ubuf); + } else { + printf("Using TCP Autotuning\n"); + } + } + printf("-----------------------------------------------------------\n"); FD_ZERO(&test->read_set); FD_ZERO(&test->temp_set); - if (test->protocol == Ptcp) - { - /* add listener to the master set */ - FD_SET(test->listener_sock_tcp, &test->read_set); - test->max_fd = test->listener_sock_tcp; - } else - { - FD_SET(test->listener_sock_udp, &test->read_set); - test->max_fd = test->listener_sock_udp; + FD_SET(test->listener, &test->read_set); + test->max_fd = test->ctrl_sck; + + return 0; +} + +int +iperf_accept(struct iperf_test *test) +{ + int s; + char ipl[512], ipr[512]; + socklen_t len; + struct sockaddr_in addr; + + if (test->ctrl_sck == 0) { + if ((s = accept(test->listener, (struct sockaddr *) &addr, &len)) < 0) { + perror("accept"); + return -1; + } + + inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) & addr.local_addr)->sin_addr), + (void *) ipl, sizeof(ipl)); + inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) & addr.remote_addr)->sin_addr), + (void *) ipr, sizeof(ipr)); + printf(report_peer, s, + ipl, ntohs(((struct sockaddr_in *) & addr.local_addr)->sin_port), + ipr, ntohs(((struct sockaddr_in *) & addr.remote_addr)->sin_port)); + + return s; + } else { + // This message needs to be sent to the client + printf("The server is busy running a test. Try again later.\n"); + return 0; + } +} + +/**************************************************************************/ +int +iperf_handle_message(struct iperf_test *test) +{ + if (read(test->ctrl_sck, &test->state, sizeof(int)) < 0) { + // indicate error on read + return -1; } - //printf("iperf_run_server: max_fd set to %d \n", test->max_fd); + switch(test->state) { + case PARAM_EXCHANGE: + iperf_exchange_parameters(test); + break; + } + + return 0; +} + +void +iperf_run_server(struct iperf_test *test) +{ + struct timeval tv; + //struct iperf_stream *np; + struct iperf_stream *sp; + struct timer *stats_interval, *reporter_interval; + char *result_string = NULL; + int j = 0, result = 0, message = 0; + int nfd = 0; + + // Open socket and listen + if (iperf_server_listen(test) < 0) { + // This needs to be replaced by more formal error handling + fprintf(stderr, "An error occurred. Exiting.\n"); + exit(1); + } test->num_streams = 0; test->default_settings->state = TEST_RUNNING; - printf("iperf_run_server: Waiting for client connect.... \n"); - while (test->default_settings->state != TEST_END) - { - memcpy(&test->temp_set, &test->read_set, sizeof(test->read_set)); - tv.tv_sec = 15; - tv.tv_usec = 0; + while (test->default_settings != TEST_END) { - /* using select to check on multiple descriptors. */ - //printf("calling select.. sock = %d \n", test->max_fd + 1); - result = select(test->max_fd + 1, &test->temp_set, NULL, NULL, &tv); - if (result == 0) - { - //printf("SERVER IDLE : %d sec\n", (int) tv.tv_sec); - continue; - } else if (result < 0 && errno != EINTR) - { - printf("Error in select(): %s, socket = %d\n", strerror(errno), test->max_fd + 1); - exit(0); - } else if (result > 0) - { - if (test->protocol == Ptcp) - { - /* Accept a new TCP connection */ - if (FD_ISSET(test->listener_sock_tcp, &test->temp_set)) - { - test->protocol = Ptcp; - test->accept = iperf_tcp_accept; - if (test->accept < 0) - return; - test->new_stream = iperf_new_tcp_stream; - test->accept(test); - test->default_settings->state = TEST_RUNNING; - FD_CLR(test->listener_sock_tcp, &test->temp_set); - //printf("iperf_run_server: accepted TCP connection \n"); - test->num_streams++; - } - } else - { - /* Accept a new UDP connection */ - if (FD_ISSET(test->listener_sock_udp, &test->temp_set)) - { - test->protocol = Pudp; - test->accept = iperf_udp_accept; - if (test->accept < 0) - return; - test->new_stream = iperf_new_udp_stream; - test->accept(test); - test->default_settings->state = TEST_RUNNING; - FD_CLR(test->listener_sock_udp, &test->temp_set); - printf("iperf_run_server: accepted UDP connection \n"); - } - } - /* Process the sockets for read operation */ - nfd = test->max_fd + 1; - for (j = 0; j <= test->max_fd; j++) - { - //printf("Checking socket %d \n", j); - if (FD_ISSET(j, &test->temp_set)) - { - //printf("iperf_run_server: data ready on socket %d \n", j); - /* find the correct stream - possibly time consuming? */ - np = find_stream_by_socket(test, j); - message = np->rcv(np); /* get data from client using - * receiver callback */ - if (message < 0) - goto done; - handle_message(test, message, np); - if (message == TEST_END) - break; /* test done, so break out of loop */ + // Copy select set and renew timers + FD_COPY(&test->read_set, &test->temp_set); + tv.tv_sec = 15; + tv.tv_usec = 0; - } /* end if (FD_ISSET(j, &temp_set)) */ - } /* end for (j=0;...) */ + result = select(test->max_fd + 1, &test->temp_set, NULL, NULL, &tv); + if (result < 0 && errno != EINTR) { + // Change the way this handles errors + perror("select"); + exit(1); + } else if (result > 0) { + if (FD_ISSET(test->listener, &test->temp_set)) { + test->ctrl_sck = iperf_accept(test); + if (test->ctrl_sck < 0) { + fprintf(stderr, "error: could not open control socket. exiting.\n"); + exit(1); + } else if (test->ctrl_sck > 0) { + // Accepted! exchange parameters / setup - if (message == PARAM_EXCHANGE) - { - /* start timer at end of PARAM_EXCHANGE */ - if (test->stats_interval != 0) - stats_interval = new_timer(test->stats_interval, 0); - if (test->reporter_interval != 0) - reporter_interval = new_timer(test->reporter_interval, 0); - } - if ((message == STREAM_BEGIN) || (message == STREAM_RUNNING)) - { - /* - * XXX: is this right? Might there be cases where we want - * stats for while in another state? - */ - if ((test->stats_interval != 0) && stats_interval->expired(stats_interval)) - { - test->stats_callback(test); - update_timer(stats_interval, test->stats_interval, 0); - } - if ((test->reporter_interval != 0) && reporter_interval->expired(reporter_interval)) - { - test->reporter_callback(test); - update_timer(reporter_interval, test->reporter_interval, 0); - } - } - } /* end else (result>0) */ - } /* end while */ + } + FD_CLR(test->listener, &test->temp_set); + } + if (FD_ISSET(test->ctrl_sck, &test->temp_set)) { + // Handle control messages + + FD_CLR(test->ctrl_sck, &test->temp_set); + } + if (FD_ISSET(test->prot_listener, &test->temp_set)) { + // Spawn new streams + + FD_CLR(test->ctrl_sck, &test->temp_set); + } + // Iterate through the streams to see if their socket FD_ISSET + for (sp = test->streams; sp != NULL; sp = sp->next) { + if (FD_ISSET(sp->socket, &test->temp_set)) { + + + } + } + } + } + +/* + while (test->default_settings->state != TEST_END) { + memcpy(&test->temp_set, &test->read_set, sizeof(test->read_set)); + tv.tv_sec = 15; + tv.tv_usec = 0; + + // using select to check on multiple descriptors. + result = select(test->max_fd + 1, &test->temp_set, NULL, NULL, &tv); + if (result == 0) { + continue; + } else if (result < 0 && errno != EINTR) { + printf("Error in select(): %s, socket = %d\n", strerror(errno), test->max_fd + 1); + exit(0); + } else if (result > 0) { + if (test->protocol == Ptcp) { + // Accept a new TCP connection + if (FD_ISSET(test->ctrl_sck, &test->temp_set)) { + test->protocol = Ptcp; + + // The following line needs to be moved to test initialization + test->accept = iperf_tcp_accept; + if (test->accept < 0) // .. really??! + return; + + // Again, needs to be moved to test initialization + test->new_stream = iperf_new_tcp_stream; + test->accept(test); + test->default_settings->state = TEST_RUNNING; + FD_CLR(test->listener_sock_tcp, &test->temp_set); + //printf("iperf_run_server: accepted TCP connection \n"); + test->num_streams++; + } + } else { + // Accept a new UDP connection + if (FD_ISSET(test->listener_sock_udp, &test->temp_set)) { + test->protocol = Pudp; + test->accept = iperf_udp_accept; + if (test->accept < 0) + return; + test->new_stream = iperf_new_udp_stream; + test->accept(test); + test->default_settings->state = TEST_RUNNING; + FD_CLR(test->listener_sock_udp, &test->temp_set); + printf("iperf_run_server: accepted UDP connection \n"); + } + } + // Process the sockets for read operation + nfd = test->max_fd + 1; + for (j = 0; j <= test->max_fd; j++) { + if (FD_ISSET(j, &test->temp_set)) { + // find the correct stream - possibly time consuming? + np = find_stream_by_socket(test, j); + message = np->rcv(np); // get data from client using receiver callback + // This code needs to be fixed to work without goto + if (message < 0) + goto done; + handle_message(test, message, np); + if (message == TEST_END) + break; + } + } + + if (message == PARAM_EXCHANGE) { + // start timer at end of PARAM_EXCHANGE + if (test->stats_interval != 0) + stats_interval = new_timer(test->stats_interval, 0); + if (test->reporter_interval != 0) + reporter_interval = new_timer(test->reporter_interval, 0); + } + + if ((message == STREAM_BEGIN) || (message == STREAM_RUNNING)) { + // + // XXX: is this right? Might there be cases where we want + // stats for while in another state? + // + if ((test->stats_interval != 0) && stats_interval->expired(stats_interval)) { + test->stats_callback(test); + update_timer(stats_interval, test->stats_interval, 0); + } + if ((test->reporter_interval != 0) && reporter_interval->expired(reporter_interval)) { + test->reporter_callback(test); + update_timer(reporter_interval, test->reporter_interval, 0); + } + } + } + } +*/ // End of the WHILE done: printf("Test Complete. \n\n"); diff --git a/src/iperf_server_api.h b/src/iperf_server_api.h index 7d8ce85..6f613df 100644 --- a/src/iperf_server_api.h +++ b/src/iperf_server_api.h @@ -1,7 +1,16 @@ +#ifndef __IPERF_SERVER_API_H +#define __IPERF_SERVER_API_H -int param_received(struct iperf_stream *sp, struct param_exchange * param); -void send_result_to_client(struct iperf_stream * sp); -void iperf_run_server(struct iperf_test * test); -struct iperf_stream *find_stream_by_socket(struct iperf_test * test, int sock); +#include "iperf.h" +int param_received(struct iperf_stream *, struct param_exchange *); +void send_result_to_client(struct iperf_stream *); + +void iperf_run_server(struct iperf_test *); + +struct iperf_stream *find_stream_by_socket(struct iperf_test *, int); + +int iperf_server_listen(struct iperf_test *); + +#endif diff --git a/src/iperf_tcp.c b/src/iperf_tcp.c index dd0ff5f..87b402e 100644 --- a/src/iperf_tcp.c +++ b/src/iperf_tcp.c @@ -173,45 +173,43 @@ iperf_tcp_recv(struct iperf_stream * sp) int iperf_tcp_send(struct iperf_stream * sp) { - int result; - int size = sp->settings->blksize; + int result; + int size = sp->settings->blksize; - if (!sp->buffer) - { - perror("transmit buffer not allocated"); - return -1; + if (!sp->buffer) { + perror("transmit buffer not allocated"); + return -1; } //printf("iperf_tcp_send: state = %d \n", sp->settings->state); memcpy(sp->buffer, &(sp->settings->state), sizeof(int));; /* set read size based on message type */ - switch (sp->settings->state) - { - case PARAM_EXCHANGE: - size = sizeof(struct param_exchange); - break; - case STREAM_BEGIN: - size = sp->settings->blksize; - break; - case STREAM_END: - size = sizeof(struct param_exchange); - break; - case RESULT_REQUEST: - size = MAX_RESULT_STRING; - break; - case ALL_STREAMS_END: - size = sizeof(struct param_exchange); - break; - case TEST_END: - size = sizeof(struct param_exchange); - break; - case STREAM_RUNNING: - size = sp->settings->blksize; - break; - default: - printf("State of the stream can't be determined\n"); - return -1; + switch (sp->settings->state) { + case PARAM_EXCHANGE: + size = sizeof(struct param_exchange); + break; + case STREAM_BEGIN: + size = sp->settings->blksize; + break; + case STREAM_END: + size = sizeof(struct param_exchange); + break; + case RESULT_REQUEST: + size = MAX_RESULT_STRING; + break; + case ALL_STREAMS_END: + size = sizeof(struct param_exchange); + break; + case TEST_END: + size = sizeof(struct param_exchange); + break; + case STREAM_RUNNING: + size = sp->settings->blksize; + break; + default: + printf("State of the stream can't be determined\n"); + return -1; } //if(sp->settings->state != STREAM_RUNNING) @@ -223,20 +221,19 @@ iperf_tcp_send(struct iperf_stream * sp) result = Nwrite(sp->socket, sp->buffer, size, Ptcp); #endif if (result < 0) - perror("Write error"); + perror("Write error"); //printf(" iperf_tcp_send: %d bytes sent \n", result); - if (sp->settings->state == STREAM_BEGIN || sp->settings->state == STREAM_RUNNING) - { - sp->result->bytes_sent += result; - sp->result->bytes_sent_this_interval += result; + if (sp->settings->state == STREAM_BEGIN || sp->settings->state == STREAM_RUNNING) { + sp->result->bytes_sent += result; + sp->result->bytes_sent_this_interval += result; } //printf("iperf_tcp_send: number bytes sent so far = %u \n", (uint64_t) sp->result->bytes_sent); /* change state after 1st send */ if (sp->settings->state == STREAM_BEGIN) - sp->settings->state = STREAM_RUNNING; + sp->settings->state = STREAM_RUNNING; return result; } diff --git a/src/main.c b/src/main.c index 7be51d7..46d9f4c 100644 --- a/src/main.c +++ b/src/main.c @@ -228,12 +228,10 @@ main(int argc, char **argv) /* exit until this is done.... */ if (test->protocol == Pudp) { printf("UDP mode not yet supported. Exiting. \n"); - exit(0); + iperf_free_test(test); + return 0; } - iperf_init_test(test); - - //printf("in main: calling iperf_run \n"); iperf_run(test); iperf_free_test(test); @@ -251,11 +249,12 @@ iperf_run(struct iperf_test * test) switch (test->role) { case 's': - while (1) { +// The following lines are commented out until I fix the bind issue with porting +// while (1) { iperf_run_server(test); - test->streams = NULL; - sleep(1); - } +// test->streams = NULL; +// sleep(1); +// } return 0; case 'c': return iperf_run_client(test); diff --git a/src/net.c b/src/net.c index 75da7ac..254637d 100644 --- a/src/net.c +++ b/src/net.c @@ -21,40 +21,39 @@ int netdial(int proto, char *client, int port) { - int s; + int s; struct hostent *hent; struct sockaddr_in sa; socklen_t sn; - /* XXX: This is not working for non-fully qualified host names - use getaddrinfo() instead? - */ - if ((hent = gethostbyname(client)) == 0) - { - perror("gethostbyname"); - return (-1); + /* XXX: This is not working for non-fully qualified host names use getaddrinfo() instead? */ + if ((hent = gethostbyname(client)) == 0) { + perror("gethostbyname"); + return (-1); } + s = socket(AF_INET, proto, 0); - if (s < 0) - { - perror("socket"); - return (-1); + if (s < 0) { + perror("socket"); + return (-1); } + memset(&sa, 0, sizeof sa); memmove(&sa.sin_addr, hent->h_addr, 4); sa.sin_port = htons(port); sa.sin_family = AF_INET; - if (connect(s, (struct sockaddr *) & sa, sizeof sa) < 0 && errno != EINPROGRESS) - { - perror("netdial: connect error"); - return (-1); + if (connect(s, (struct sockaddr *) & sa, sizeof sa) < 0 && errno != EINPROGRESS) { + perror("netdial: connect error"); + return (-1); } + sn = sizeof sa; - if (getpeername(s, (struct sockaddr *) & sa, &sn) >= 0) - { - return (s); + + if (getpeername(s, (struct sockaddr *) & sa, &sn) >= 0) { + return (s); } + perror("getpeername error"); return (-1); } @@ -64,33 +63,31 @@ netdial(int proto, char *client, int port) int netannounce(int proto, char *local, int port) { - int s; + int s; struct sockaddr_in sa; /* XXX: implement binding to a local address rather than * */ - //printf("in netannounce: port = %d \n", port); memset((void *) &sa, 0, sizeof sa); s = socket(AF_INET, proto, 0); - if (s < 0) - { - perror("socket"); - return (-1); + if (s < 0) { + perror("socket"); + return (-1); } - int opt = 1; + int opt = 1; setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt)); sa.sin_port = htons(port); sa.sin_family = AF_INET; - if (bind(s, (struct sockaddr *) & sa, sizeof(struct sockaddr_in)) < 0) - { - close(s); - perror("bind"); - return (-1); + if (bind(s, (struct sockaddr *) & sa, sizeof(struct sockaddr_in)) < 0) { + close(s); + perror("bind"); + return (-1); } + if (proto == SOCK_STREAM) - listen(s, 5); + listen(s, 5); return s; } @@ -106,12 +103,11 @@ Nread(int fd, char *buf, int count, int prot) struct sockaddr from; socklen_t len = sizeof(from); register int cnt; - if (prot == SOCK_DGRAM) - { - cnt = recvfrom(fd, buf, count, 0, &from, &len); - } else - { - cnt = mread(fd, buf, count); + + if (prot == SOCK_DGRAM) { + cnt = recvfrom(fd, buf, count, 0, &from, &len); + } else { + cnt = mread(fd, buf, count); } return (cnt); }