added ALL_STREAMS_END message from client to server, changed message communication code for TCP

This commit is contained in:
kaustubhprabhu 2009-06-25 19:40:48 +00:00
parent 368cc7bc7c
commit b890e75d5e
2 changed files with 92 additions and 94 deletions

View File

@ -44,25 +44,19 @@ static struct option longopts[] =
void send_result_to_client(struct iperf_stream *sp) void send_result_to_client(struct iperf_stream *sp)
{ {
int result,i; int result;
int size = sp->settings->blksize; int size = sp->settings->blksize;
char *buf = (char *) malloc(size); char *buf = (char *) malloc(size);
if(!buf) if(!buf)
{ {
perror("malloc: unable to allocate transmit buffer"); perror("malloc: unable to allocate transmit buffer");
} }
printf("sending the result to Client\n%s\n", (char *) sp->data);
memcpy(buf, sp->data, strlen((char *)sp->data)); memcpy(buf, sp->data, strlen((char *)sp->data));
result = send(sp->socket, buf, size , 0); result = send(sp->socket, buf, size , 0);
} }
void receive_result_from_server(struct iperf_test *test) void receive_result_from_server(struct iperf_test *test)
@ -78,9 +72,11 @@ void receive_result_from_server(struct iperf_test *test)
test->num_streams= 1; test->num_streams= 1;
iperf_init_test(test); iperf_init_test(test);
sp = test->streams; sp = test->streams;
sp->settings->state = RESULT_REQUEST;
// send the request sp->settings->state = ALL_STREAMS_END;
sp->snd(sp);
sp->settings->state = RESULT_REQUEST;
sp->snd(sp); sp->snd(sp);
// receive from server // receive from server
@ -92,11 +88,10 @@ void receive_result_from_server(struct iperf_test *test)
} while (result == -1 && errno == EINTR); } while (result == -1 && errno == EINTR);
// buf[size] = '\0'; printf("RESULT FROM SERVER -\n");
puts(buf); puts(buf);
} }
int getsock_tcp_mss( int inSock ) int getsock_tcp_mss( int inSock )
{ {
int theMSS = 0; int theMSS = 0;
@ -164,7 +159,7 @@ void connect_msg(struct iperf_stream *sp)
inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) &sp->local_addr)->sin_addr), (void *) ipl, sizeof(ipl)); inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) &sp->local_addr)->sin_addr), (void *) ipl, sizeof(ipl));
inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) &sp->remote_addr)->sin_addr), (void *) ipr, sizeof(ipr)); inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) &sp->remote_addr)->sin_addr), (void *) ipr, sizeof(ipr));
printf("[%3d] local %s port %d connected with %s port %d\n", printf("[%3d] local %s port %d connected with %s port %d\n",
sp->socket, sp->socket,
ipl, ntohs(((struct sockaddr_in *) &sp->local_addr)->sin_port), ipl, ntohs(((struct sockaddr_in *) &sp->local_addr)->sin_port),
@ -220,7 +215,12 @@ int iperf_tcp_recv(struct iperf_stream *sp)
{ {
ch = buf[0]; ch = buf[0];
message = (int) ch; message = (int) ch;
} }
if(message == 3 || message == 8)
printf(" message is %d \n", message);
sp->result->bytes_received+= result; sp->result->bytes_received+= result;
free(buf); free(buf);
@ -275,11 +275,13 @@ int iperf_tcp_send(struct iperf_stream *sp)
case STREAM_END: case STREAM_END:
buf[0]= STREAM_END; buf[0]= STREAM_END;
break; break;
case RESULT_REQUEST: case RESULT_REQUEST:
buf[0]= RESULT_REQUEST; buf[0]= RESULT_REQUEST;
break; break;
case ALL_STREAMS_END:
buf[0]= ALL_STREAMS_END;
break;
default: default:
buf[0]= 0; buf[0]= 0;
break; break;
@ -327,15 +329,15 @@ int iperf_udp_send(struct iperf_stream *sp)
{ {
case STREAM_BEGIN: case STREAM_BEGIN:
buf[0]= STREAM_BEGIN; buf[0]= STREAM_BEGIN;
break; break;
case STREAM_END: case STREAM_END:
buf[0]= STREAM_END; buf[0]= STREAM_END;
break; break;
case RESULT_REQUEST: case RESULT_REQUEST:
buf[0]= RESULT_REQUEST; buf[0]= RESULT_REQUEST;
break; break;
case ALL_STREAMS_END:
buf[0]= ALL_STREAMS_END;
default: default:
buf[0]= 0; buf[0]= 0;
break; break;
@ -518,7 +520,7 @@ char *iperf_reporter_callback(struct iperf_test *test)
struct iperf_stream *sp = test->streams; struct iperf_stream *sp = test->streams;
iperf_size_t bytes=0; iperf_size_t bytes=0;
// char *message = (char *) malloc((sizeof("[%d]\t %llu bytes received %s per sec \n") * test->num_streams + 1) + 20 ); // char *message = (char *) malloc((sizeof("[%d]\t %llu bytes received %s per sec \n") * test->num_streams + 1) + 20 );
char *message = (char *) malloc(200); char *message = (char *) malloc(300);
char *message_final = (char *) malloc(test->num_streams * 50 + 200); char *message_final = (char *) malloc(test->num_streams * 50 + 200);
if(test->default_settings->state == TEST_RUNNING) if(test->default_settings->state == TEST_RUNNING)
@ -535,15 +537,15 @@ char *iperf_reporter_callback(struct iperf_test *test)
bytes+= sp->result->interval_results->bytes_transferred; bytes+= sp->result->interval_results->bytes_transferred;
unit_snprintf(ubuf, UNIT_LEN, (double) ( sp->result->interval_results->bytes_transferred / test->stats_interval), test->mFormat); unit_snprintf(ubuf, UNIT_LEN, (double) ( sp->result->interval_results->bytes_transferred / test->stats_interval), test->mFormat);
sprintf(message,"[%d]\t %llu bytes sent \t %s per sec \n",sp->socket, sp->result->interval_results->bytes_transferred , ubuf); sprintf(message,"[%d]\t %llu bytes sent \t %s per sec \n",sp->socket, sp->result->interval_results->bytes_transferred , ubuf);
strcat(message_final, message); strcat(message_final, message);
} }
else if(test->role == 's') else if(test->role == 's')
{ {
bytes+= sp->result->interval_results->bytes_transferred; bytes+= sp->result->interval_results->bytes_transferred;
unit_snprintf(ubuf, UNIT_LEN, (double) ( sp->result->interval_results->bytes_transferred / test->stats_interval), test->mFormat); unit_snprintf(ubuf, UNIT_LEN, (double) ( sp->result->interval_results->bytes_transferred / test->stats_interval), test->mFormat);
sprintf(message,"[%d]\t %llu bytes received \t %s per sec \n",sp->socket, sp->result->interval_results->bytes_transferred , ubuf); sprintf(message,"[%d]\t %llu bytes received \t %s per sec \n",sp->socket, sp->result->interval_results->bytes_transferred , ubuf);
strcat(message_final, message); strcat(message_final, message);
} }
sp = sp->next; sp = sp->next;
@ -554,7 +556,7 @@ char *iperf_reporter_callback(struct iperf_test *test)
unit_snprintf(ubuf, UNIT_LEN, (double) ( bytes / test->stats_interval), test->mFormat); unit_snprintf(ubuf, UNIT_LEN, (double) ( bytes / test->stats_interval), test->mFormat);
sprintf(message,"SUM\t %llu bytes COUNT \t %s per sec \n", bytes , ubuf); sprintf(message,"SUM\t %llu bytes COUNT \t %s per sec \n", bytes , ubuf);
strcat(message_final, message); strcat(message_final, message);
printf(message,"---------------------------------------------------\n"); sprintf(message,"---------------------------------------------------\n");
strcat(message_final, message); strcat(message_final, message);
} }
@ -570,22 +572,27 @@ char *iperf_reporter_callback(struct iperf_test *test)
{ {
if(test->role == 'c') if(test->role == 'c')
{ {
bytes+= sp->result->bytes_sent; if(sp->settings->state == STREAM_END)
gettimeofday( &sp->result->end_time, NULL); {
bytes+= sp->result->bytes_sent;
gettimeofday( &sp->result->end_time, NULL);
unit_snprintf(ubuf, UNIT_LEN, (double) (sp->result->bytes_sent /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec)), test->mFormat); unit_snprintf(ubuf, UNIT_LEN, (double) (sp->result->bytes_sent /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec)), test->mFormat);
sprintf(message,"[%d]\t %llu bytes sent %s per sec\n", sp->socket, sp->result->bytes_sent, ubuf); sprintf(message,"[%d]\t %llu bytes sent %s per sec\n", sp->socket, sp->result->bytes_sent, ubuf);
strcat(message_final, message); strcat(message_final, message);
}
} }
else if(test->role == 's') else if(test->role == 's')
{ {
bytes+= sp->result->bytes_received; if(sp->settings->state == STREAM_END)
{
bytes+= sp->result->bytes_received;
unit_snprintf(ubuf, UNIT_LEN, (double) sp->result->bytes_received /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec), test->mFormat); unit_snprintf(ubuf, UNIT_LEN, (double) sp->result->bytes_received /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec), test->mFormat);
sprintf(message,"[%d]\t %llu bytes received %s per sec\n", sp->socket, sp->result->bytes_received, ubuf); sprintf(message,"[%d]\t %llu bytes received %s per sec\n", sp->socket, sp->result->bytes_received, ubuf);
strcat(message_final, message); strcat(message_final, message);
}
} }
sp = sp->next; sp = sp->next;
} }
@ -907,8 +914,10 @@ void iperf_run_server(struct iperf_test *test)
printf("SERVER IDLE : %d sec\n", (int)tv.tv_sec); printf("SERVER IDLE : %d sec\n", (int)tv.tv_sec);
else if (result < 0 && errno != EINTR) else if (result < 0 && errno != EINTR)
{
printf("Error in select(): %s\n", strerror(errno)); printf("Error in select(): %s\n", strerror(errno));
exit(0);
}
else if(result >0) else if(result >0)
{ {
@ -916,9 +925,7 @@ void iperf_run_server(struct iperf_test *test)
if (FD_ISSET(test->listener_sock, &test->temp_set)) if (FD_ISSET(test->listener_sock, &test->temp_set))
{ {
test->accept(test); test->accept(test);
test->default_settings->state = TEST_RUNNING; test->default_settings->state = TEST_RUNNING;
test->num_streams++;
printf(" ACCEPT :count = %d \n", test->num_streams);
FD_CLR(test->listener_sock, &test->temp_set); FD_CLR(test->listener_sock, &test->temp_set);
} }
@ -933,10 +940,9 @@ void iperf_run_server(struct iperf_test *test)
message = n->rcv(n); message = n->rcv(n);
if(message == STREAM_END) if(message == STREAM_END)
{ {
n->settings->state = STREAM_END; n->settings->state = STREAM_END;
gettimeofday(&n->result->end_time, NULL); gettimeofday(&n->result->end_time, NULL);
test->num_streams--;
FD_CLR(j, &test->read_set); FD_CLR(j, &test->read_set);
} }
@ -946,60 +952,50 @@ void iperf_run_server(struct iperf_test *test)
n->data = read; n->data = read;
send_result_to_client(n); send_result_to_client(n);
close(n->socket); // FREE ALL STREAMS
test->num_streams--; n = test->streams;
printf(" count = %d \n", test->num_streams); while(n)
iperf_free_stream(test, n); {
printf("TEST_END\n"); close(n->socket);
iperf_free_stream(test, n);
// reset params n= n->next;
}
printf("TEST_END\n\n\n");
// reset TEST params
iperf_defaults(test); iperf_defaults(test);
test->max_fd = test->listener_sock; test->max_fd = test->listener_sock;
test->num_streams = 0;
} }
if( message == ALL_STREAMS_END )
{
n->settings->state = ALL_STREAMS_END;
test->default_settings->state = RESULT_REQUEST;
read = test->reporter_callback(test);
printf("Reporter has been called\n");
printf("ALL_STREAMS_END\n");
//change UDP listening socket to TCP listening socket for
//accepting the result request
if(test->protocol == Pudp)
{
close(test->listener_sock);
test->protocol = Ptcp;
test->accept = iperf_tcp_accept;
iperf_defaults(test);
iperf_init_test(test);
test->max_fd = test->listener_sock;
}
}
}// end if (FD_ISSET(j, &temp_set)) }// end if (FD_ISSET(j, &temp_set))
}// end for (j=0;...) }// end for (j=0;...)
// Detect if ALL streams have finished
if( test->num_streams == 0 && test->default_settings->state == TEST_RUNNING)
{
test->default_settings->state = RESULT_REQUEST;
read = test->reporter_callback(test);
puts(read);
// FREE ALL STREAMS
n = test->streams;
while(n)
{
close(n->socket);
fflush(stdout);
iperf_free_stream(test, n);
n= n->next;
}
printf("STREAM_END\n");
//change UDP listening socket to TCP listening socket for
//accepting the result request
if(test->protocol == Pudp)
{
close(test->listener_sock);
test->protocol = Ptcp;
test->accept = iperf_tcp_accept;
iperf_defaults(test);
iperf_init_test(test);
test->max_fd = test->listener_sock;
test->num_streams = 0;
}
}
}// end else (result>0) }// end else (result>0)
}// end while }// end while
@ -1108,7 +1104,8 @@ void iperf_run_client(struct iperf_test *test)
close(sp->socket); close(sp->socket);
iperf_free_stream(test, sp); iperf_free_stream(test, sp);
np = sp->next; np = sp->next;
} while (np); } while (np);
// Requesting for result from Server // Requesting for result from Server
receive_result_from_server(test); receive_result_from_server(test);

View File

@ -136,6 +136,7 @@ enum {
#define TEST_END 5 #define TEST_END 5
#define STREAM_BEGIN 6 #define STREAM_BEGIN 6
#define STREAM_END 7 #define STREAM_END 7
#define ALL_STREAMS_END 8
/** /**
* iperf_new_test -- return a new iperf_test with default values * iperf_new_test -- return a new iperf_test with default values