added messege communication betweenn client and serverin iperf_api.c, moved state to iperf_settings in iperf_api.h

This commit is contained in:
kaustubhprabhu 2009-06-24 00:52:45 +00:00
parent d7f56e1528
commit 16975b3bba
2 changed files with 150 additions and 74 deletions

View File

@ -34,11 +34,41 @@ static struct option longopts[] =
{ "bandwidth", required_argument, NULL, 'b' }, { "bandwidth", required_argument, NULL, 'b' },
{ "length", required_argument, NULL, 'l' }, { "length", required_argument, NULL, 'l' },
{ "window", required_argument, NULL, 'w' }, { "window", required_argument, NULL, 'w' },
{"interval", required_argument, NULL, 'i'}, {"interval", required_argument, NULL, 'i'},
{"NoDelay", no_argument, NULL, 'N'},
{"Print-MSS", no_argument, NULL, 'm'},
{"Set-MSS", required_argument, NULL, 'M'},
{ NULL, 0, NULL, 0 } { NULL, 0, NULL, 0 }
}; };
char *fill_data(char *buf, int state)
{
int i;
switch(state)
{
case STREAM_BEGIN:
buf[0]= (char) STREAM_BEGIN;
break;
case STREAM_END:
buf[0]= (char) STREAM_END;
break;
case RESULT_REQUEST:
buf[0]= (char) RESULT_REQUEST;
break;
default:
buf[0]= 0;
break;
}
for(i=1; i < sizeof(buf); i++)
buf[i] = i % 37;
return buf;
}
int getsock_tcp_mss( int inSock ) int getsock_tcp_mss( int inSock )
{ {
int theMSS = 0; int theMSS = 0;
@ -143,50 +173,66 @@ void Display(struct iperf_test *test)
int iperf_tcp_recv(struct iperf_stream *sp) int iperf_tcp_recv(struct iperf_stream *sp)
{ {
int result; int result, messege;
int size = sp->settings->blksize; int size = sp->settings->blksize;
char *buf = (char *) malloc(size); char *buf = (char *) malloc(size);
if(!buf) if(!buf)
{ {
perror("malloc: unable to allocate receive buffer"); perror("malloc: unable to allocate receive buffer");
} }
do{ do{
result = recv(sp->socket, buf, size, 0); result = recv(sp->socket, buf, size, 0);
} while (result == -1 && errno == EINTR); } while (result == -1 && errno == EINTR);
sp->result->bytes_received+= result; //interprete the type of messege in packet
return result; if(result > 0)
{
messege = (int) buf[0];
}
sp->result->bytes_received+= result;
if(messege == STREAM_END)
printf("end received \n");
free(buf);
return messege;
} }
int iperf_udp_recv(struct iperf_stream *sp) int iperf_udp_recv(struct iperf_stream *sp)
{ {
int result, messege;
int result;
int size = sp->settings->blksize; int size = sp->settings->blksize;
char *buf = (char *) malloc(size); char *buf = (char *) malloc(size);
if(!buf) if(!buf)
{ {
perror("malloc: unable to allocate receive buffer"); perror("malloc: unable to allocate receive buffer");
} }
do{ do{
result = recv(sp->socket, buf, size, 0); result = recv(sp->socket, buf, size, 0);
} while (result == -1 && errno == EINTR); } while (result == -1 && errno == EINTR);
sp->result->bytes_received+= result;
//interprete the type of messege in packet
messege = (int) buf[0];
return result; if(messege!= STREAM_END)
sp->result->bytes_received+= result;
if(messege == STREAM_END)
printf("end received \n");
//free(buf);
return messege;
} }
int iperf_tcp_send(struct iperf_stream *sp) int iperf_tcp_send(struct iperf_stream *sp)
{ {
int result,i; int result;
int size = sp->settings->blksize; int size = sp->settings->blksize;
char *buf = (char *) malloc(size); char *buf = (char *) malloc(size);
@ -194,8 +240,14 @@ int iperf_tcp_send(struct iperf_stream *sp)
{ {
perror("malloc: unable to allocate transmit buffer"); perror("malloc: unable to allocate transmit buffer");
} }
for(i=0; i < size; i++)
buf[i] = i % 37; //applicable for 1st packet sent
if(sp->settings->state == STREAM_BEGIN)
{
sp->settings->state = TEST_RUNNING;
}
buf = fill_data(buf, sp->settings->state);
result = send(sp->socket, buf, size , 0); result = send(sp->socket, buf, size , 0);
sp->result->bytes_sent+= size; sp->result->bytes_sent+= size;
@ -205,7 +257,7 @@ int iperf_tcp_send(struct iperf_stream *sp)
int iperf_udp_send(struct iperf_stream *sp) int iperf_udp_send(struct iperf_stream *sp)
{ {
int result,i; int result;
struct timeval before, after; struct timeval before, after;
int64_t delayus, adjustus, dtargus; int64_t delayus, adjustus, dtargus;
int size = sp->settings->blksize; int size = sp->settings->blksize;
@ -220,11 +272,22 @@ int iperf_udp_send(struct iperf_stream *sp)
assert(dtargus != 0); assert(dtargus != 0);
for(i=0; i < size; i++) // applicable for 1st packet sent
buf[i] = i % 37; if(sp->settings->state == STREAM_BEGIN)
{
buf = fill_data(buf, sp->settings->state);
sp->settings->state = TEST_RUNNING;
}
else
{
buf = fill_data(buf, sp->settings->state);
}
if(((struct timer *) sp->data)->expired((struct timer *) sp->data)) if(((struct timer *) sp->data)->expired((struct timer *) sp->data))
{ {
// applicable for 1s packet sent
if(sp->settings->state == STREAM_BEGIN)
sp->settings->state = TEST_RUNNING;
if(gettimeofday(&before, 0) < 0) if(gettimeofday(&before, 0) < 0)
perror("gettimeofday"); perror("gettimeofday");
@ -239,10 +302,10 @@ int iperf_udp_send(struct iperf_stream *sp)
adjustus += (before.tv_sec - after.tv_sec) * SEC_TO_US ; adjustus += (before.tv_sec - after.tv_sec) * SEC_TO_US ;
adjustus += (before.tv_usec - after.tv_usec); adjustus += (before.tv_usec - after.tv_usec);
printf(" the adjust time = %lld \n",dtargus- adjustus); //printf(" the adjust time = %lld \n",dtargus- adjustus);
// if( adjustus > 0) { if( adjustus > 0) {
dtargus = adjustus; dtargus = adjustus;
//} }
memcpy(&before, &after, sizeof before); memcpy(&before, &after, sizeof before);
// RESET THE TIMER // RESET THE TIMER
@ -282,7 +345,6 @@ void iperf_defaults(struct iperf_test *testp)
testp->role = 's'; testp->role = 's';
testp->duration = 10; testp->duration = 10;
testp->server_port = 5001; testp->server_port = 5001;
testp->state = TEST_START;
testp->mFormat = 'a'; testp->mFormat = 'a';
@ -300,6 +362,8 @@ void iperf_init_test(struct iperf_test *test)
struct iperf_stream *sp; struct iperf_stream *sp;
int i, s=0; int i, s=0;
test->default_settings->state = TEST_START;
if(test->role == 's') if(test->role == 's')
{ {
test->listener_sock = netannounce(test->protocol, NULL, test->server_port); test->listener_sock = netannounce(test->protocol, NULL, test->server_port);
@ -396,7 +460,7 @@ char *iperf_reporter_callback(struct iperf_test *test)
char *messege = (char *) malloc(200); char *messege = (char *) malloc(200);
char *messege_final = (char *) malloc(test->num_streams * 50 + 200); char *messege_final = (char *) malloc(test->num_streams * 50 + 200);
if(test->state == TEST_RUNNING) if(test->default_settings->state == TEST_RUNNING)
{ {
printf("\n----------------INTERVAL [%d to %d]----------------\n", sp->result->interval_results->interval_duration - test->stats_interval, printf("\n----------------INTERVAL [%d to %d]----------------\n", sp->result->interval_results->interval_duration - test->stats_interval,
@ -429,7 +493,7 @@ char *iperf_reporter_callback(struct iperf_test *test)
} }
// PRINT TOTAL // PRINT TOTAL
if(test->state == TEST_END) if(test->default_settings->state == TEST_END)
{ {
bytes =0; bytes =0;
sp= test->streams; sp= test->streams;
@ -441,8 +505,7 @@ char *iperf_reporter_callback(struct iperf_test *test)
{ {
if(test->role == 'c') if(test->role == 'c')
{ {
bytes+= sp->result->bytes_sent; bytes+= sp->result->bytes_sent;
gettimeofday( &sp->result->end_time, NULL); gettimeofday( &sp->result->end_time, NULL);
unit_snprintf(ubuf, UNIT_LEN, (double) (sp->result->bytes_sent /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec)), test->mFormat); unit_snprintf(ubuf, UNIT_LEN, (double) (sp->result->bytes_sent /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec)), test->mFormat);
@ -451,8 +514,7 @@ char *iperf_reporter_callback(struct iperf_test *test)
} }
else if(test->role == 's') else if(test->role == 's')
{ {
bytes+= sp->result->bytes_received; bytes+= sp->result->bytes_received;
gettimeofday( &sp->result->end_time, NULL);
unit_snprintf(ubuf, UNIT_LEN, (double) sp->result->bytes_received /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec), test->mFormat); unit_snprintf(ubuf, UNIT_LEN, (double) sp->result->bytes_received /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec), test->mFormat);
printf("[%d]\t %llu bytes received %s per sec \n",sp->socket, sp->result->bytes_received, ubuf); printf("[%d]\t %llu bytes received %s per sec \n",sp->socket, sp->result->bytes_received, ubuf);
@ -557,7 +619,9 @@ struct iperf_stream *iperf_new_stream(struct iperf_test *testp)
gettimeofday(&sp->result->start_time, NULL); gettimeofday(&sp->result->start_time, NULL);
sp->result->interval_results->bytes_transferred = 0; sp->result->interval_results->bytes_transferred = 0;
sp->result->interval_results->interval_duration = 0; sp->result->interval_results->interval_duration = 0;
sp->settings->state = STREAM_BEGIN;
return sp; return sp;
} }
@ -759,7 +823,7 @@ void iperf_run_server(struct iperf_test *test)
{ {
struct timeval tv; struct timeval tv;
struct iperf_stream *n; struct iperf_stream *n;
int j, result,count= 0; int j,result, messege,count= 0;
char *read = NULL; char *read = NULL;
FD_ZERO(&test->read_set); FD_ZERO(&test->read_set);
@ -782,20 +846,18 @@ void iperf_run_server(struct iperf_test *test)
else if (result < 0 && errno != EINTR) else if (result < 0 && errno != EINTR)
printf("Error in select(): %s\n", strerror(errno)); printf("Error in select(): %s\n", strerror(errno));
// Accept a new connection
else if(result >0) else if(result >0)
{ {
// Accept a new connection
if (FD_ISSET(test->listener_sock, &test->temp_set)) if (FD_ISSET(test->listener_sock, &test->temp_set))
{ {
test->accept(test); test->accept(test);
count++; count++;
FD_CLR(test->listener_sock, &test->temp_set); FD_CLR(test->listener_sock, &test->temp_set);
} }
// test end message ?
//result_request message ?
//Process the sockets for read operation //Process the sockets for read operation
for (j=0; j< test->max_fd+1; j++) for (j=0; j< test->max_fd+1; j++)
{ {
@ -803,39 +865,39 @@ void iperf_run_server(struct iperf_test *test)
{ {
// find the correct stream // find the correct stream
n = find_stream_by_socket(test,j); n = find_stream_by_socket(test,j);
result = n->rcv(n); messege = n->rcv(n);
if(result == 0) if(messege == STREAM_END)
{ {
// stream shutdown message printf("result requested");
gettimeofday(&n->result->end_time, NULL);
count --; count --;
FD_CLR(j, &test->read_set); FD_CLR(j, &test->read_set);
} }
else if(result < 0)
{
printf("Error in recv(): %s\n", strerror(errno));
}
} // end if (FD_ISSET(j, &temp_set)) } // end if (FD_ISSET(j, &temp_set))
}// end for (j=0;...) }// end for (j=0;...)
// Detect if ALL streams have finished // Detect if ALL streams have finished
if(count == 0) if(count == 0)
{ {
test->state = TEST_END; test->default_settings->state = TEST_END;
read = test->reporter_callback(test);
read = test->reporter_callback(test);
puts(read); puts(read);
// FREE ALL STREAMS // FREE ALL STREAMS
n = test->streams; n = test->streams;
while(n) while(n)
{ {
close(n->socket); close(n->socket);
printf("socket closed");
iperf_free_stream(test, n); iperf_free_stream(test, n);
n= n->next; n= n->next;
} }
printf(" TEST ENDED\n");
printf(" TEST ENDED\n");
break;
} }
}// end else (result>0) }// end else (result>0)
@ -874,8 +936,7 @@ void iperf_run_client(struct iperf_test *test)
{ {
sp->data = new_timer(0, dtargus); sp->data = new_timer(0, dtargus);
sp= sp->next; sp= sp->next;
} }
} }
timer = new_timer(test->duration, 0); timer = new_timer(test->duration, 0);
@ -920,29 +981,39 @@ void iperf_run_client(struct iperf_test *test)
reporter_interval = new_timer(test->reporter_interval,0); reporter_interval = new_timer(test->reporter_interval,0);
} }
}// while outer timer }// while outer timer
// sending STREAM_END packets
test->state = TEST_END;
test->reporter_callback(test);
// Send the EOF - 0 buffer packets
sp = test->streams; sp = test->streams;
np = sp; np = sp;
do { do
{
sp = np;
sp->settings->state = STREAM_END;
sp->snd(sp);
printf("sent last packet\n");
np = sp->next;
} while (np);
test->default_settings->state = TEST_END;
test->reporter_callback(test);
// Deleting all streams
sp = test->streams;
np = sp;
do
{
sp = np; sp = np;
send(sp->socket, buf, 0, 0);
np = sp->next;
close(sp->socket); close(sp->socket);
iperf_free_stream(test, sp); iperf_free_stream(test, sp);
np = sp->next;
} while (np); } while (np);
} }
int iperf_run(struct iperf_test *test) int iperf_run(struct iperf_test *test)
{ {
test->state = TEST_RUNNING; test->default_settings->state = TEST_RUNNING;
if(test->role == 's') if(test->role == 's')
{ {
@ -993,13 +1064,13 @@ main(int argc, char **argv)
test->num_streams = atoi(optarg); test->num_streams = atoi(optarg);
break; break;
case 'b': case 'b':
test->default_settings->rate = atoi(optarg); test->default_settings->rate = unit_atof(optarg);
break; break;
case 'l': case 'l':
test->default_settings->blksize = atol(optarg); test->default_settings->blksize = atol(optarg);
break; break;
case 'w': case 'w':
test->default_settings->socket_bufsize = atoi(optarg); test->default_settings->socket_bufsize = unit_atof(optarg);
case 'i': case 'i':
test->stats_interval = atoi(optarg); test->stats_interval = atoi(optarg);
test->reporter_interval = atoi(optarg); test->reporter_interval = atoi(optarg);

View File

@ -1,4 +1,4 @@
typedef uint64_t iperf_size_t; typedef uint64_t iperf_size_t;
struct iperf_interval_results struct iperf_interval_results
{ {
@ -29,6 +29,7 @@ struct iperf_settings
int MSS; //for TCP MSS int MSS; //for TCP MSS
int ttl; int ttl;
int tos; int tos;
int state; // This is state of a stream/test - can use Union for this
}; };
struct iperf_stream struct iperf_stream
@ -64,8 +65,8 @@ struct iperf_test
char *server_hostname; // arg of -c char *server_hostname; // arg of -c
int server_port; // arg of -p int server_port; // arg of -p
int duration; // total duration of test -t int duration; // total duration of test -t
int listener_sock; int listener_sock;
int state;
/*boolen variables for Options */ /*boolen variables for Options */
int mDaemon; // -D int mDaemon; // -D
@ -130,7 +131,11 @@ enum {
#define TEST_START 1 #define TEST_START 1
#define TEST_RUNNING 2 #define TEST_RUNNING 2
#define TEST_END 3 #define RESULT_REQUEST 3
#define RESULT_RESPOND 4
#define TEST_END 5
#define STREAM_BEGIN 6
#define STREAM_END 7
/** /**
* iperf_new_test -- return a new iperf_test with default values * iperf_new_test -- return a new iperf_test with default values