Corrected message communication memory leak, added result exchange messages for TCP

This commit is contained in:
kaustubhprabhu 2009-06-25 00:59:56 +00:00
parent 16975b3bba
commit 368cc7bc7c
2 changed files with 252 additions and 148 deletions

View File

@ -42,33 +42,61 @@ static struct option longopts[] =
};
char *fill_data(char *buf, int state)
{
int i;
switch(state)
{
case STREAM_BEGIN:
buf[0]= (char) STREAM_BEGIN;
break;
case STREAM_END:
buf[0]= (char) STREAM_END;
break;
case RESULT_REQUEST:
buf[0]= (char) RESULT_REQUEST;
break;
default:
buf[0]= 0;
break;
}
void send_result_to_client(struct iperf_stream *sp)
{
int result,i;
int size = sp->settings->blksize;
char *buf = (char *) malloc(size);
if(!buf)
{
perror("malloc: unable to allocate transmit buffer");
}
printf("sending the result to Client\n%s\n", (char *) sp->data);
memcpy(buf, sp->data, strlen((char *)sp->data));
result = send(sp->socket, buf, size , 0);
for(i=1; i < sizeof(buf); i++)
buf[i] = i % 37;
return buf;
}
void receive_result_from_server(struct iperf_test *test)
{
int result;
struct iperf_stream *sp;
int size =0;
char *buf = NULL;
test->protocol = Ptcp;
test->new_stream = iperf_new_tcp_stream;
test->num_streams= 1;
iperf_init_test(test);
sp = test->streams;
sp->settings->state = RESULT_REQUEST;
// send the request
sp->snd(sp);
// receive from server
size = sp->settings->blksize;
buf = (char *) malloc(size);
do{
result = recv(sp->socket, buf, size, 0);
} while (result == -1 && errno == EINTR);
// buf[size] = '\0';
puts(buf);
}
int getsock_tcp_mss( int inSock )
{
int theMSS = 0;
@ -173,7 +201,8 @@ void Display(struct iperf_test *test)
int iperf_tcp_recv(struct iperf_stream *sp)
{
int result, messege;
int result, message;
char ch;
int size = sp->settings->blksize;
char *buf = (char *) malloc(size);
if(!buf)
@ -186,25 +215,22 @@ int iperf_tcp_recv(struct iperf_stream *sp)
} while (result == -1 && errno == EINTR);
//interprete the type of messege in packet
//interprete the type of message in packet
if(result > 0)
{
messege = (int) buf[0];
}
{
ch = buf[0];
message = (int) ch;
}
sp->result->bytes_received+= result;
if(messege == STREAM_END)
printf("end received \n");
free(buf);
return messege;
free(buf);
return message;
}
int iperf_udp_recv(struct iperf_stream *sp)
{
int result, messege;
{
int result, message;
char ch;
int size = sp->settings->blksize;
char *buf = (char *) malloc(size);
if(!buf)
@ -213,26 +239,26 @@ int iperf_udp_recv(struct iperf_stream *sp)
}
do{
result = recv(sp->socket, buf, size, 0);
result = recv(sp->socket, buf, size, 0);
} while (result == -1 && errno == EINTR);
//interprete the type of messege in packet
messege = (int) buf[0];
//interprete the type of message in packet
if(result > 0)
{
ch = buf[0];
message = (int) ch;
}
if(messege!= STREAM_END)
sp->result->bytes_received+= result;
sp->result->bytes_received+= result;
if(messege == STREAM_END)
printf("end received \n");
//free(buf);
return messege;
free(buf);
return message;
}
int iperf_tcp_send(struct iperf_stream *sp)
{
int result;
int result,i;
int size = sp->settings->blksize;
char *buf = (char *) malloc(size);
@ -241,53 +267,89 @@ int iperf_tcp_send(struct iperf_stream *sp)
perror("malloc: unable to allocate transmit buffer");
}
switch(sp->settings->state)
{
case STREAM_BEGIN:
buf[0]= STREAM_BEGIN;
break;
case STREAM_END:
buf[0]= STREAM_END;
break;
case RESULT_REQUEST:
buf[0]= RESULT_REQUEST;
break;
default:
buf[0]= 0;
break;
}
for(i=1; i < size; i++)
buf[i] = i % 37;
//applicable for 1st packet sent
if(sp->settings->state == STREAM_BEGIN)
{
sp->settings->state = TEST_RUNNING;
{
sp->settings->state = TEST_RUNNING;
}
buf = fill_data(buf, sp->settings->state);
result = send(sp->socket, buf, size , 0);
sp->result->bytes_sent+= size;
free(buf);
return result;
}
int iperf_udp_send(struct iperf_stream *sp)
{
int result;
int result, i;
struct timeval before, after;
int64_t delayus, adjustus, dtargus;
int size = sp->settings->blksize;
char *buf = (char *) malloc(size);
if(!buf)
{
perror("malloc: unable to allocate transmit buffer");
}
int64_t adjustus, dtargus;
dtargus = (int64_t)(sp->settings->blksize) * SEC_TO_US * 8;
dtargus /= sp->settings->rate;
// the || part ensures that last packet is sent to server - the STREAM_END MESSAGE
if(((struct timer *) sp->data)->expired((struct timer *) sp->data) || sp->settings->state == STREAM_END)
{
int size = sp->settings->blksize;
char *buf = (char *) malloc(size);
if(!buf)
{
perror("malloc: unable to allocate transmit buffer");
}
dtargus = (int64_t)(sp->settings->blksize) * SEC_TO_US * 8;
dtargus /= sp->settings->rate;
assert(dtargus != 0);
assert(dtargus != 0);
// applicable for 1st packet sent
if(sp->settings->state == STREAM_BEGIN)
{
buf = fill_data(buf, sp->settings->state);
sp->settings->state = TEST_RUNNING;
}
else
{
buf = fill_data(buf, sp->settings->state);
}
if(((struct timer *) sp->data)->expired((struct timer *) sp->data))
{
// applicable for 1s packet sent
switch(sp->settings->state)
{
case STREAM_BEGIN:
buf[0]= STREAM_BEGIN;
break;
case STREAM_END:
buf[0]= STREAM_END;
break;
case RESULT_REQUEST:
buf[0]= RESULT_REQUEST;
break;
default:
buf[0]= 0;
break;
}
for(i=1; i < size; i++)
buf[i] = i % 37;
// applicable for 1st packet sent
if(sp->settings->state == STREAM_BEGIN)
sp->settings->state = TEST_RUNNING;
{
sp->settings->state = TEST_RUNNING;
}
if(gettimeofday(&before, 0) < 0)
perror("gettimeofday");
@ -310,7 +372,7 @@ int iperf_udp_send(struct iperf_stream *sp)
// RESET THE TIMER
sp->data = new_timer(0, dtargus);
printf(" new timer is %lld usec\n", dtargus);
//printf(" new timer is %lld usec\n", dtargus);
} // timer_expired_micro
@ -354,6 +416,7 @@ void iperf_defaults(struct iperf_test *testp)
testp->default_settings->socket_bufsize = 1024*1024;
testp->default_settings->blksize = DEFAULT_TCP_BLKSIZE;
testp->default_settings->rate = RATE;
testp->default_settings->state = TEST_START;
}
void iperf_init_test(struct iperf_test *test)
@ -361,9 +424,7 @@ void iperf_init_test(struct iperf_test *test)
char ubuf[UNIT_LEN];
struct iperf_stream *sp;
int i, s=0;
test->default_settings->state = TEST_START;
if(test->role == 's')
{
test->listener_sock = netannounce(test->protocol, NULL, test->server_port);
@ -456,15 +517,16 @@ char *iperf_reporter_callback(struct iperf_test *test)
char ubuf[UNIT_LEN];
struct iperf_stream *sp = test->streams;
iperf_size_t bytes=0;
// char *messege = (char *) malloc((sizeof("[%d]\t %llu bytes received %s per sec \n") * test->num_streams + 1) + 20 );
char *messege = (char *) malloc(200);
char *messege_final = (char *) malloc(test->num_streams * 50 + 200);
// char *message = (char *) malloc((sizeof("[%d]\t %llu bytes received %s per sec \n") * test->num_streams + 1) + 20 );
char *message = (char *) malloc(200);
char *message_final = (char *) malloc(test->num_streams * 50 + 200);
if(test->default_settings->state == TEST_RUNNING)
{
printf("\n----------------INTERVAL [%d to %d]----------------\n", sp->result->interval_results->interval_duration - test->stats_interval,
{
sprintf(message,"\n----------------INTERVAL [%d to %d]----------------\n", sp->result->interval_results->interval_duration - test->stats_interval,
sp->result->interval_results->interval_duration);
strcat(message_final, message);
while(sp)
{
@ -472,34 +534,37 @@ char *iperf_reporter_callback(struct iperf_test *test)
{
bytes+= sp->result->interval_results->bytes_transferred;
unit_snprintf(ubuf, UNIT_LEN, (double) ( sp->result->interval_results->bytes_transferred / test->stats_interval), test->mFormat);
printf("[%d]\t %llu bytes sent \t %s per sec \n",sp->socket, sp->result->interval_results->bytes_transferred , ubuf);
sprintf(message,"[%d]\t %llu bytes sent \t %s per sec \n",sp->socket, sp->result->interval_results->bytes_transferred , ubuf);
strcat(message_final, message);
}
else if(test->role == 's')
{
bytes+= sp->result->interval_results->bytes_transferred;
unit_snprintf(ubuf, UNIT_LEN, (double) ( sp->result->interval_results->bytes_transferred / test->stats_interval), test->mFormat);
printf("[%d]\t %llu bytes received \t %s per sec \n",sp->socket, sp->result->interval_results->bytes_transferred , ubuf);
sprintf(message,"[%d]\t %llu bytes received \t %s per sec \n",sp->socket, sp->result->interval_results->bytes_transferred , ubuf);
strcat(message_final, message);
}
sp = sp->next;
}
printf("---------------------------------------------------\n");
sprintf(message,"---------------------------------------------------\n");
strcat(message_final, message);
unit_snprintf(ubuf, UNIT_LEN, (double) ( bytes / test->stats_interval), test->mFormat);
printf("SUM\t %llu bytes COUNT \t %s per sec \n", bytes , ubuf);
printf("---------------------------------------------------\n");
sprintf(message,"SUM\t %llu bytes COUNT \t %s per sec \n", bytes , ubuf);
strcat(message_final, message);
printf(message,"---------------------------------------------------\n");
strcat(message_final, message);
}
// PRINT TOTAL
if(test->default_settings->state == TEST_END)
// PRINT TOTAL
if(test->default_settings->state == RESULT_REQUEST)
{
bytes =0;
sp= test->streams;
printf("\n-----------------------TOTAL-----------------------\n");
sprintf(messege, "\n-----------------------TOTAL-----------------------\n");
strcat(messege_final, messege);
sp= test->streams;
sprintf(message, "\n-----------------------TOTAL-----------------------\n");
strcat(message_final, message);
while(sp)
{
@ -509,18 +574,17 @@ char *iperf_reporter_callback(struct iperf_test *test)
gettimeofday( &sp->result->end_time, NULL);
unit_snprintf(ubuf, UNIT_LEN, (double) (sp->result->bytes_sent /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec)), test->mFormat);
printf("[%d]\t %llu bytes sent %s per sec \n",sp->socket, sp->result->bytes_sent , ubuf);
sprintf(message,"[%d]\t %llu bytes sent %s per sec\n", sp->socket, sp->result->bytes_sent, ubuf);
strcat(message_final, message);
}
else if(test->role == 's')
{
bytes+= sp->result->bytes_received;
unit_snprintf(ubuf, UNIT_LEN, (double) sp->result->bytes_received /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec), test->mFormat);
printf("[%d]\t %llu bytes received %s per sec \n",sp->socket, sp->result->bytes_received, ubuf);
// IF SERVER IS DEAMON, ONLY THIS IS NEEDED
sprintf(messege,"[%d]\t %llu bytes received %s per sec\n", sp->socket, sp->result->bytes_received, ubuf);
strcat(messege_final, messege);
unit_snprintf(ubuf, UNIT_LEN, (double) sp->result->bytes_received /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec), test->mFormat);
sprintf(message,"[%d]\t %llu bytes received %s per sec\n", sp->socket, sp->result->bytes_received, ubuf);
strcat(message_final, message);
}
sp = sp->next;
@ -528,29 +592,26 @@ char *iperf_reporter_callback(struct iperf_test *test)
sp = test->streams;
printf("---------------------------------------------------\n");
unit_snprintf(ubuf, UNIT_LEN, (double) bytes /(sp->result->end_time.tv_sec - sp->result->start_time.tv_sec), test->mFormat);
printf("SUM\t %llu bytes TOTAL %s per sec \n", bytes , ubuf);
printf("---------------------------------------------------\n\n");
//IF SERVER IS DEAMON, ONLY THIS IS NEEDED
sprintf(messege, "---------------------------------------------------\n");
strcat(messege_final, messege);
sprintf(messege, "SUM\t %llu bytes TOTAL %s per sec \n", bytes , ubuf);
strcat(messege_final, messege);
sprintf(messege, "---------------------------------------------------\n\n");
strcat(messege_final, messege);
sprintf(message, "---------------------------------------------------\n");
strcat(message_final, message);
sprintf(message, "SUM\t %llu bytes TOTAL %s per sec \n", bytes , ubuf);
strcat(message_final, message);
sprintf(message, "---------------------------------------------------\n\n");
strcat(message_final, message);
// -m option
if((test->mPrintMSS != 0) && (test->role == 'c'))
{
printf(" the TCP maximum segment size MSS = %d \n", getsock_tcp_mss(sp->socket));
sprintf(message,"the TCP maximum segment size MSS = %d \n", getsock_tcp_mss(sp->socket));
strcat(message_final, message);
}
}
return messege_final;
return message_final;
}
void iperf_free_stream(struct iperf_test *test, struct iperf_stream *sp)
@ -583,7 +644,7 @@ void iperf_free_stream(struct iperf_test *test, struct iperf_stream *sp)
}
}
}
sp->settings = NULL;
free(sp->settings);
free(sp->result);
free(sp);
@ -823,14 +884,16 @@ void iperf_run_server(struct iperf_test *test)
{
struct timeval tv;
struct iperf_stream *n;
int j,result, messege,count= 0;
int j,result, message;
char *read = NULL;
FD_ZERO(&test->read_set);
FD_SET(test->listener_sock, &test->read_set);
test->max_fd = test->listener_sock;
test->num_streams = 0;
test->default_settings->state = TEST_RUNNING;
while(1)
{
memcpy(&test->temp_set, &test->read_set,sizeof(test->temp_set));
@ -853,7 +916,9 @@ void iperf_run_server(struct iperf_test *test)
if (FD_ISSET(test->listener_sock, &test->temp_set))
{
test->accept(test);
count++;
test->default_settings->state = TEST_RUNNING;
test->num_streams++;
printf(" ACCEPT :count = %d \n", test->num_streams);
FD_CLR(test->listener_sock, &test->temp_set);
}
@ -865,24 +930,45 @@ void iperf_run_server(struct iperf_test *test)
{
// find the correct stream
n = find_stream_by_socket(test,j);
messege = n->rcv(n);
message = n->rcv(n);
if(messege == STREAM_END)
{
printf("result requested");
if(message == STREAM_END)
{
n->settings->state = STREAM_END;
gettimeofday(&n->result->end_time, NULL);
count --;
test->num_streams--;
FD_CLR(j, &test->read_set);
}
if(message == RESULT_REQUEST)
{
n->settings->state = RESULT_RESPOND;
n->data = read;
send_result_to_client(n);
close(n->socket);
test->num_streams--;
printf(" count = %d \n", test->num_streams);
iperf_free_stream(test, n);
printf("TEST_END\n");
// reset params
iperf_defaults(test);
test->max_fd = test->listener_sock;
test->num_streams = 0;
}
} // end if (FD_ISSET(j, &temp_set))
}// end if (FD_ISSET(j, &temp_set))
}// end for (j=0;...)
// Detect if ALL streams have finished
if(count == 0)
{
test->default_settings->state = TEST_END;
if( test->num_streams == 0 && test->default_settings->state == TEST_RUNNING)
{
test->default_settings->state = RESULT_REQUEST;
read = test->reporter_callback(test);
puts(read);
@ -890,15 +976,29 @@ void iperf_run_server(struct iperf_test *test)
n = test->streams;
while(n)
{
close(n->socket);
printf("socket closed");
close(n->socket);
fflush(stdout);
iperf_free_stream(test, n);
n= n->next;
}
printf("STREAM_END\n");
printf(" TEST ENDED\n");
break;
}
//change UDP listening socket to TCP listening socket for
//accepting the result request
if(test->protocol == Pudp)
{
close(test->listener_sock);
test->protocol = Ptcp;
test->accept = iperf_tcp_accept;
iperf_defaults(test);
iperf_init_test(test);
test->max_fd = test->listener_sock;
test->num_streams = 0;
}
}
}// end else (result>0)
@ -911,7 +1011,7 @@ void iperf_run_client(struct iperf_test *test)
int i,result;
struct iperf_stream *sp, *np;
struct timer *timer, *stats_interval, *reporter_interval;
char *buf;
char *buf, *read= NULL;
int64_t delayus, adjustus, dtargus;
struct timeval tv;
int ret=0;
@ -976,12 +1076,13 @@ void iperf_run_client(struct iperf_test *test)
}
if((test->reporter_interval!= 0) && reporter_interval->expired(reporter_interval))
{
test->reporter_callback(test);
{
read = test->reporter_callback(test);
puts(read);
reporter_interval = new_timer(test->reporter_interval,0);
}
}// while outer timer
}// while outer timer
// sending STREAM_END packets
sp = test->streams;
@ -990,15 +1091,15 @@ void iperf_run_client(struct iperf_test *test)
{
sp = np;
sp->settings->state = STREAM_END;
sp->snd(sp);
printf("sent last packet\n");
sp->snd(sp);
np = sp->next;
} while (np);
test->default_settings->state = TEST_END;
test->reporter_callback(test);
test->default_settings->state = RESULT_REQUEST;
read = test->reporter_callback(test);
puts(read);
// Deleting all streams
// Deleting all streams - CAN CHANGE FREE_STREAM FN
sp = test->streams;
np = sp;
do
@ -1008,6 +1109,9 @@ void iperf_run_client(struct iperf_test *test)
iperf_free_stream(test, sp);
np = sp->next;
} while (np);
// Requesting for result from Server
receive_result_from_server(test);
}

View File

@ -189,7 +189,7 @@ void iperf_init_stream(struct iperf_stream *sp, struct iperf_test *testp);
* iperf_free_stream -- free resources associated with test
*
*/
void iperf_free_stream(struct iperf_test *test, struct iperf_stream *stream);
void iperf_free_stream(struct iperf_test *test, struct iperf_stream *sp);
/**