From 16975b3bbae5dcd95deb5c194c03fec863bb6f60 Mon Sep 17 00:00:00 2001 From: kaustubhprabhu Date: Wed, 24 Jun 2009 00:52:45 +0000 Subject: [PATCH] added messege communication betweenn client and serverin iperf_api.c, moved state to iperf_settings in iperf_api.h --- src/iperf_api.c | 211 ++++++++++++++++++++++++++++++++---------------- src/iperf_api.h | 13 ++- 2 files changed, 150 insertions(+), 74 deletions(-) diff --git a/src/iperf_api.c b/src/iperf_api.c index 7df9438..7b139af 100644 --- a/src/iperf_api.c +++ b/src/iperf_api.c @@ -34,11 +34,41 @@ static struct option longopts[] = { "bandwidth", required_argument, NULL, 'b' }, { "length", required_argument, NULL, 'l' }, { "window", required_argument, NULL, 'w' }, -{"interval", required_argument, NULL, 'i'}, +{"interval", required_argument, NULL, 'i'}, +{"NoDelay", no_argument, NULL, 'N'}, +{"Print-MSS", no_argument, NULL, 'm'}, +{"Set-MSS", required_argument, NULL, 'M'}, { NULL, 0, NULL, 0 } }; +char *fill_data(char *buf, int state) +{ + int i; + switch(state) + { + case STREAM_BEGIN: + buf[0]= (char) STREAM_BEGIN; + break; + + case STREAM_END: + buf[0]= (char) STREAM_END; + break; + + case RESULT_REQUEST: + buf[0]= (char) RESULT_REQUEST; + break; + default: + buf[0]= 0; + break; + } + + for(i=1; i < sizeof(buf); i++) + buf[i] = i % 37; + + return buf; +} + int getsock_tcp_mss( int inSock ) { int theMSS = 0; @@ -143,50 +173,66 @@ void Display(struct iperf_test *test) int iperf_tcp_recv(struct iperf_stream *sp) { - int result; + int result, messege; int size = sp->settings->blksize; char *buf = (char *) malloc(size); if(!buf) { perror("malloc: unable to allocate receive buffer"); } - + do{ - result = recv(sp->socket, buf, size, 0); - + result = recv(sp->socket, buf, size, 0); } while (result == -1 && errno == EINTR); - sp->result->bytes_received+= result; - return result; + //interprete the type of messege in packet + if(result > 0) + { + messege = (int) buf[0]; + } + + sp->result->bytes_received+= result; + + if(messege == STREAM_END) + printf("end received \n"); + + + free(buf); + return messege; } int iperf_udp_recv(struct iperf_stream *sp) { - - int result; + int result, messege; int size = sp->settings->blksize; char *buf = (char *) malloc(size); - if(!buf) { perror("malloc: unable to allocate receive buffer"); } do{ - result = recv(sp->socket, buf, size, 0); + result = recv(sp->socket, buf, size, 0); } while (result == -1 && errno == EINTR); - - sp->result->bytes_received+= result; + //interprete the type of messege in packet + messege = (int) buf[0]; - return result; + if(messege!= STREAM_END) + sp->result->bytes_received+= result; + + if(messege == STREAM_END) + printf("end received \n"); + + //free(buf); + return messege; } int iperf_tcp_send(struct iperf_stream *sp) { - int result,i; + int result; int size = sp->settings->blksize; char *buf = (char *) malloc(size); @@ -194,8 +240,14 @@ int iperf_tcp_send(struct iperf_stream *sp) { perror("malloc: unable to allocate transmit buffer"); } - for(i=0; i < size; i++) - buf[i] = i % 37; + + //applicable for 1st packet sent + if(sp->settings->state == STREAM_BEGIN) + { + sp->settings->state = TEST_RUNNING; + } + + buf = fill_data(buf, sp->settings->state); result = send(sp->socket, buf, size , 0); sp->result->bytes_sent+= size; @@ -205,7 +257,7 @@ int iperf_tcp_send(struct iperf_stream *sp) int iperf_udp_send(struct iperf_stream *sp) { - int result,i; + int result; struct timeval before, after; int64_t delayus, adjustus, dtargus; int size = sp->settings->blksize; @@ -220,11 +272,22 @@ int iperf_udp_send(struct iperf_stream *sp) assert(dtargus != 0); - for(i=0; i < size; i++) - buf[i] = i % 37; - + // applicable for 1st packet sent + if(sp->settings->state == STREAM_BEGIN) + { + buf = fill_data(buf, sp->settings->state); + sp->settings->state = TEST_RUNNING; + } + else + { + buf = fill_data(buf, sp->settings->state); + } + if(((struct timer *) sp->data)->expired((struct timer *) sp->data)) - { + { + // applicable for 1s packet sent + if(sp->settings->state == STREAM_BEGIN) + sp->settings->state = TEST_RUNNING; if(gettimeofday(&before, 0) < 0) perror("gettimeofday"); @@ -239,10 +302,10 @@ int iperf_udp_send(struct iperf_stream *sp) adjustus += (before.tv_sec - after.tv_sec) * SEC_TO_US ; adjustus += (before.tv_usec - after.tv_usec); - printf(" the adjust time = %lld \n",dtargus- adjustus); - // if( adjustus > 0) { + //printf(" the adjust time = %lld \n",dtargus- adjustus); + if( adjustus > 0) { dtargus = adjustus; - //} + } memcpy(&before, &after, sizeof before); // RESET THE TIMER @@ -282,7 +345,6 @@ void iperf_defaults(struct iperf_test *testp) testp->role = 's'; testp->duration = 10; testp->server_port = 5001; - testp->state = TEST_START; testp->mFormat = 'a'; @@ -300,6 +362,8 @@ void iperf_init_test(struct iperf_test *test) struct iperf_stream *sp; int i, s=0; + test->default_settings->state = TEST_START; + if(test->role == 's') { test->listener_sock = netannounce(test->protocol, NULL, test->server_port); @@ -396,7 +460,7 @@ char *iperf_reporter_callback(struct iperf_test *test) char *messege = (char *) malloc(200); char *messege_final = (char *) malloc(test->num_streams * 50 + 200); - if(test->state == TEST_RUNNING) + if(test->default_settings->state == TEST_RUNNING) { printf("\n----------------INTERVAL [%d to %d]----------------\n", sp->result->interval_results->interval_duration - test->stats_interval, @@ -429,7 +493,7 @@ char *iperf_reporter_callback(struct iperf_test *test) } // PRINT TOTAL - if(test->state == TEST_END) + if(test->default_settings->state == TEST_END) { bytes =0; sp= test->streams; @@ -441,8 +505,7 @@ char *iperf_reporter_callback(struct iperf_test *test) { if(test->role == 'c') { - bytes+= sp->result->bytes_sent; - + bytes+= sp->result->bytes_sent; gettimeofday( &sp->result->end_time, NULL); unit_snprintf(ubuf, UNIT_LEN, (double) (sp->result->bytes_sent /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec)), test->mFormat); @@ -451,8 +514,7 @@ char *iperf_reporter_callback(struct iperf_test *test) } else if(test->role == 's') { - bytes+= sp->result->bytes_received; - gettimeofday( &sp->result->end_time, NULL); + bytes+= sp->result->bytes_received; unit_snprintf(ubuf, UNIT_LEN, (double) sp->result->bytes_received /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec), test->mFormat); printf("[%d]\t %llu bytes received %s per sec \n",sp->socket, sp->result->bytes_received, ubuf); @@ -557,7 +619,9 @@ struct iperf_stream *iperf_new_stream(struct iperf_test *testp) gettimeofday(&sp->result->start_time, NULL); sp->result->interval_results->bytes_transferred = 0; - sp->result->interval_results->interval_duration = 0; + sp->result->interval_results->interval_duration = 0; + + sp->settings->state = STREAM_BEGIN; return sp; } @@ -759,7 +823,7 @@ void iperf_run_server(struct iperf_test *test) { struct timeval tv; struct iperf_stream *n; - int j, result,count= 0; + int j,result, messege,count= 0; char *read = NULL; FD_ZERO(&test->read_set); @@ -782,20 +846,18 @@ void iperf_run_server(struct iperf_test *test) else if (result < 0 && errno != EINTR) printf("Error in select(): %s\n", strerror(errno)); - // Accept a new connection + else if(result >0) - { + { + // Accept a new connection if (FD_ISSET(test->listener_sock, &test->temp_set)) { test->accept(test); count++; FD_CLR(test->listener_sock, &test->temp_set); - } + } + - // test end message ? - - //result_request message ? - //Process the sockets for read operation for (j=0; j< test->max_fd+1; j++) { @@ -803,39 +865,39 @@ void iperf_run_server(struct iperf_test *test) { // find the correct stream n = find_stream_by_socket(test,j); - result = n->rcv(n); + messege = n->rcv(n); - if(result == 0) + if(messege == STREAM_END) { - // stream shutdown message + printf("result requested"); + gettimeofday(&n->result->end_time, NULL); count --; FD_CLR(j, &test->read_set); } - else if(result < 0) - { - printf("Error in recv(): %s\n", strerror(errno)); - } + } // end if (FD_ISSET(j, &temp_set)) }// end for (j=0;...) // Detect if ALL streams have finished if(count == 0) - { - test->state = TEST_END; - - read = test->reporter_callback(test); + { + test->default_settings->state = TEST_END; + read = test->reporter_callback(test); puts(read); // FREE ALL STREAMS n = test->streams; while(n) { - close(n->socket); + close(n->socket); + printf("socket closed"); iperf_free_stream(test, n); n= n->next; } - printf(" TEST ENDED\n"); + + printf(" TEST ENDED\n"); + break; } }// end else (result>0) @@ -874,8 +936,7 @@ void iperf_run_client(struct iperf_test *test) { sp->data = new_timer(0, dtargus); sp= sp->next; - } - + } } timer = new_timer(test->duration, 0); @@ -920,29 +981,39 @@ void iperf_run_client(struct iperf_test *test) reporter_interval = new_timer(test->reporter_interval,0); } - }// while outer timer + }// while outer timer - - test->state = TEST_END; - test->reporter_callback(test); - - // Send the EOF - 0 buffer packets + // sending STREAM_END packets sp = test->streams; np = sp; - do { + do + { + sp = np; + sp->settings->state = STREAM_END; + sp->snd(sp); + printf("sent last packet\n"); + np = sp->next; + } while (np); + + test->default_settings->state = TEST_END; + test->reporter_callback(test); + + // Deleting all streams + sp = test->streams; + np = sp; + do + { sp = np; - send(sp->socket, buf, 0, 0); - np = sp->next; close(sp->socket); iperf_free_stream(test, sp); - - } while (np); + np = sp->next; + } while (np); } int iperf_run(struct iperf_test *test) { - test->state = TEST_RUNNING; + test->default_settings->state = TEST_RUNNING; if(test->role == 's') { @@ -993,13 +1064,13 @@ main(int argc, char **argv) test->num_streams = atoi(optarg); break; case 'b': - test->default_settings->rate = atoi(optarg); + test->default_settings->rate = unit_atof(optarg); break; case 'l': test->default_settings->blksize = atol(optarg); break; case 'w': - test->default_settings->socket_bufsize = atoi(optarg); + test->default_settings->socket_bufsize = unit_atof(optarg); case 'i': test->stats_interval = atoi(optarg); test->reporter_interval = atoi(optarg); diff --git a/src/iperf_api.h b/src/iperf_api.h index b1fee27..05c549f 100644 --- a/src/iperf_api.h +++ b/src/iperf_api.h @@ -1,4 +1,4 @@ -typedef uint64_t iperf_size_t; + typedef uint64_t iperf_size_t; struct iperf_interval_results { @@ -29,6 +29,7 @@ struct iperf_settings int MSS; //for TCP MSS int ttl; int tos; + int state; // This is state of a stream/test - can use Union for this }; struct iperf_stream @@ -64,8 +65,8 @@ struct iperf_test char *server_hostname; // arg of -c int server_port; // arg of -p int duration; // total duration of test -t - int listener_sock; - int state; + int listener_sock; + /*boolen variables for Options */ int mDaemon; // -D @@ -130,7 +131,11 @@ enum { #define TEST_START 1 #define TEST_RUNNING 2 -#define TEST_END 3 +#define RESULT_REQUEST 3 +#define RESULT_RESPOND 4 +#define TEST_END 5 +#define STREAM_BEGIN 6 +#define STREAM_END 7 /** * iperf_new_test -- return a new iperf_test with default values