added PARAM_Exchange messages and cookies to prevent test collisions

This commit is contained in:
kaustubhprabhu 2009-07-21 23:28:18 +00:00
parent 7b63081921
commit 89fe565493
2 changed files with 186 additions and 67 deletions

View File

@ -16,6 +16,7 @@
#include <stdint.h>
#include <netinet/tcp.h>
#include <sys/time.h>
#include <uuid/uuid.h>
#include "iperf_api.h"
#include "timer.h"
@ -43,6 +44,59 @@ static struct option longopts[] =
};
void exchange_parameters(struct iperf_test *test)
{
int result, size = DEFAULT_TCP_BLKSIZE;
char *buf = (char *) malloc(size);
struct iperf_test *temp;
struct iperf_stream *sp;
struct param_exchange *param = (struct param_exchange *) buf;
//setting up exchange parameters
uuid_generate(test->default_settings->cookie);
uuid_copy(param->cookie, test->default_settings->cookie);
param->state = PARAM_EXCHANGE;
param->blksize = test->default_settings->blksize;
param->recv_window = test->default_settings->socket_rcv_bufsize;
param->send_window = test->default_settings->socket_snd_bufsize;
param->format = test->default_settings->unit_format;
temp = iperf_new_test();
iperf_defaults(temp);
temp->role = 'c';
temp->new_stream = iperf_new_tcp_stream;
temp->server_hostname = test->server_hostname;
temp->server_port = test->server_port;
iperf_init_test(temp);
sp = temp->streams;
sp->settings->state = PARAM_EXCHANGE;
printf("Sending EXCHNG Request \n");
result = send(sp->socket, buf, size , 0);
do{
result = recv(sp->socket, buf, size, 0);
} while (result == -1 && errno == EINTR);
if (result > 0 && buf[0] == -1)
{
printf("Busy server Detected\n");
exit(0);
}
else
{
printf("New connection started \n");
}
iperf_free_stream(temp, sp);
iperf_free_test(temp);
free(buf);
}
void setnonblocking(int sock)
{
@ -131,8 +185,7 @@ void receive_result_from_server(struct iperf_test *test)
//Overriding actual Test parameters for result exchange
test->protocol = Ptcp;
test->new_stream = iperf_new_tcp_stream;
test->default_settings->blksize = DEFAULT_TCP_BLKSIZE;
test->new_stream = iperf_new_tcp_stream;
test->num_streams= 1;
iperf_init_test(test);
@ -171,7 +224,7 @@ int getsock_tcp_mss( int inSock )
rc = getsockopt( inSock, IPPROTO_TCP, TCP_MAXSEG, (char*) &mss, &len );
return mss;
}
}
int set_socket_options(struct iperf_stream *sp, struct iperf_test *tp)
{
@ -207,7 +260,7 @@ int set_socket_options(struct iperf_stream *sp, struct iperf_test *tp)
len = sizeof( new_mss );
rc = setsockopt( sp->socket, IPPROTO_TCP, TCP_MAXSEG, (char*) &new_mss, len);
if ( rc == -1) {
perror("setsockopt");
perror("setsockoptBING");
return -1;
}
@ -271,6 +324,7 @@ int iperf_tcp_recv(struct iperf_stream *sp)
char ch;
int size = sp->settings->blksize;
char *buf = (char *) malloc(size);
struct param_exchange *param = (struct param_exchange *) buf;
if(!buf)
{
perror("malloc: unable to allocate receive buffer");
@ -282,8 +336,9 @@ int iperf_tcp_recv(struct iperf_stream *sp)
} while (result == -1 && errno == EINTR);
//interprete the type of message in packet
//-TODO = change this for Cookie implementation
if(result > 0)
{
{
ch = buf[0];
message = (int) ch;
@ -291,6 +346,33 @@ int iperf_tcp_recv(struct iperf_stream *sp)
// printf("result = %d state = %d\n",result, buf[0]);
}
if(param->state == PARAM_EXCHANGE)
{
printf("PARAM_EXHANGE caught\n");
message = param->state;
// setting the parameters
if(uuid_is_null(sp->settings->cookie))
{
uuid_copy(sp->settings->cookie, param->cookie);
sp->settings->blksize = param->blksize;
sp->settings->socket_rcv_bufsize = param->recv_window;
sp->settings->unit_format = param->format;
}
else
{
printf("New connection denied\n");
// send NO to client
int size = DEFAULT_TCP_BLKSIZE;
buf = (char *) malloc(size);
buf[0] = -1;
result = send(sp->socket, buf, size, 0);
return -1;
}
}
if(message == 6)
printf("the blksize = %d\n", sp->settings->blksize);
if(message == 3 || message == 8 || message == 9 )
{
// printf("count = %ld result = %d\n", strlen(buf), result);
@ -380,15 +462,18 @@ int iperf_tcp_send(struct iperf_stream *sp)
{
int result,i;
int size = sp->settings->blksize;
char *buf = (char *) malloc(size);
if(!buf)
{
perror("malloc: unable to allocate transmit buffer");
}
switch(sp->settings->state)
{
{
case STREAM_BEGIN:
buf[0]= (STREAM_BEGIN);
for(i=1; i < size; i++)
@ -423,8 +508,6 @@ int iperf_tcp_send(struct iperf_stream *sp)
break;
}
//applicable for 1st packet sent
if(sp->settings->state == STREAM_BEGIN)
{
@ -453,12 +536,13 @@ int iperf_udp_send(struct iperf_stream *sp)
char *buf = (char *) malloc(size);
// this is for udp packet/jitter/lost packet measurements
struct udp_datagram *udp = (struct udp_datagram *) buf;
struct param_exchange *param =NULL;
if(!buf)
{
perror("malloc: unable to allocate transmit buffer");
}
}
dtargus = (int64_t)(sp->settings->blksize) * SEC_TO_US * 8;
dtargus /= sp->settings->rate;
@ -470,7 +554,7 @@ int iperf_udp_send(struct iperf_stream *sp)
switch(sp->settings->state)
{
case STREAM_BEGIN:
case STREAM_BEGIN:
udp->state = STREAM_BEGIN;
udp->stream_id = (int)sp;
//udp->packet_count = ++sp->packet_count;
@ -497,7 +581,7 @@ int iperf_udp_send(struct iperf_stream *sp)
break;
}
// applicable for 1st packet sent
// applicable for 1st packet sent for each stream
if(sp->settings->state == STREAM_BEGIN)
{
sp->settings->state = STREAM_RUNNING;
@ -513,7 +597,7 @@ int iperf_udp_send(struct iperf_stream *sp)
// CHECK: Packet length and ID
//if(sp->settings->state == STREAM_RUNNING)
// printf("State = %d Outgoing packet = %d AND SP = %d\n",sp->settings->state, sp->packet_count, sp->socket);
// printf("State = %d Outgoing packet = %d AND SP = %d\n",sp->settings->state, sp->packet_count, sp->socket);
if(sp->settings->state == STREAM_RUNNING)
sp->result->bytes_sent+= result;
@ -533,6 +617,7 @@ int iperf_udp_send(struct iperf_stream *sp)
// RESET THE TIMER
update_timer(sp->send_timer, 0, dtargus);
free(buf);
param = NULL;
} // timer_expired_micro
@ -565,17 +650,18 @@ void iperf_defaults(struct iperf_test *testp)
testp->protocol = Ptcp;
testp->role = 's';
testp->duration = 10;
testp->server_port = 5001;
testp->server_port = 5001;
testp->unit_format = 'a';
testp->default_settings->unit_format = 'a';
testp->stats_interval = 0;
testp->reporter_interval = 0;
testp->num_streams = 1;
testp->num_streams = 1;
testp->default_settings->socket_bufsize = 1024*1024;
testp->default_settings->blksize = DEFAULT_TCP_BLKSIZE;
testp->default_settings->rate = RATE;
testp->default_settings->state = TEST_START;
testp->default_settings->mss = 0;
}
void iperf_init_test(struct iperf_test *test)
@ -588,7 +674,7 @@ void iperf_init_test(struct iperf_test *test)
{
test->listener_sock_udp = netannounce(Pudp, NULL, test->server_port);
if( test->listener_sock_udp < 0)
exit(0);
exit(0);
if(set_tcp_windowsize( test->listener_sock_udp, test->default_settings->socket_bufsize, SO_RCVBUF) < 0)
{
@ -619,14 +705,13 @@ void iperf_init_test(struct iperf_test *test)
}
else if( test->role == 'c')
{
{
FD_ZERO(&test->write_set);
FD_SET(s, &test->write_set);
for(i = 0; i < test->num_streams; i++)
{
s = netdial(test->protocol, test->server_hostname, test->server_port);
if(s < 0)
{
fprintf(stderr, "netdial failed\n");
@ -648,6 +733,8 @@ void iperf_init_test(struct iperf_test *test)
iperf_init_stream(sp, test);
iperf_add_stream(test, sp);
if(test->default_settings->state != RESULT_REQUEST)
connect_msg(sp);
@ -746,7 +833,8 @@ char *iperf_reporter_callback(struct iperf_test *test)
sp = sp->next;
}
char *message_final = (char *) malloc((count+1) * (strlen(report_bw_jitter_loss_header) + strlen(report_bw_jitter_loss_format) + strlen(report_sum_bw_jitter_loss_format)));
char *message_final = (char *) malloc((count+1) * (strlen(report_bw_jitter_loss_header)
+ strlen(report_bw_jitter_loss_format) + strlen(report_sum_bw_jitter_loss_format)));
memset(message_final,0, strlen(message_final));
struct iperf_interval_results *ip = test->streams->result->interval_results;
@ -778,12 +866,12 @@ char *iperf_reporter_callback(struct iperf_test *test)
if(test->streams->result->interval_results->next != NULL)
{
unit_snprintf(nbuf, UNIT_LEN, (double) (ip->bytes_transferred / (ip->interval_duration -ip_prev->interval_duration)),
test->unit_format);
test->default_settings->unit_format);
sprintf(message, report_bw_format, sp->socket,ip_prev->interval_duration, ip->interval_duration, ubuf, nbuf);
}
else
{
unit_snprintf(nbuf, UNIT_LEN, (double) (ip->bytes_transferred /ip->interval_duration), test->unit_format);
unit_snprintf(nbuf, UNIT_LEN, (double) (ip->bytes_transferred /ip->interval_duration), test->default_settings->unit_format);
sprintf(message, report_bw_format, sp->socket, 0.0, ip->interval_duration, ubuf, nbuf);
}
strcat(message_final, message);
@ -795,12 +883,12 @@ char *iperf_reporter_callback(struct iperf_test *test)
if(test->streams->result->interval_results->next != NULL)
{
unit_snprintf(nbuf, UNIT_LEN, (double) (ip->bytes_transferred / (ip->interval_duration -ip_prev->interval_duration)),
test->unit_format);
test->default_settings->unit_format);
sprintf(message, report_sum_bw_format, ip_prev->interval_duration, ip->interval_duration, ubuf, nbuf);
}
else
{
unit_snprintf(nbuf, UNIT_LEN, (double) (ip->bytes_transferred /ip->interval_duration), test->unit_format);
unit_snprintf(nbuf, UNIT_LEN, (double) (ip->bytes_transferred /ip->interval_duration), test->default_settings->unit_format);
sprintf(message, report_sum_bw_format, 0.0, ip->interval_duration, ubuf, nbuf);
}
@ -845,13 +933,13 @@ char *iperf_reporter_callback(struct iperf_test *test)
if(test->role == 'c')
{
unit_snprintf(ubuf, UNIT_LEN, (double) (sp->result->bytes_sent), 'A');
unit_snprintf(nbuf, UNIT_LEN, (double) (sp->result->bytes_sent / end_time), test->unit_format);
unit_snprintf(nbuf, UNIT_LEN, (double) (sp->result->bytes_sent / end_time),test->default_settings->unit_format);
}
else
{
unit_snprintf(ubuf, UNIT_LEN, (double) (sp->result->bytes_received), 'A');
unit_snprintf(nbuf, UNIT_LEN, (double) (sp->result->bytes_received / end_time), test->unit_format);
unit_snprintf(nbuf, UNIT_LEN, (double) (sp->result->bytes_received / end_time), test->default_settings->unit_format);
}
if( test->protocol == Ptcp)
@ -884,7 +972,7 @@ char *iperf_reporter_callback(struct iperf_test *test)
end_time = timeval_diff(&sp->result->start_time, &sp->result->end_time);
unit_snprintf(ubuf, UNIT_LEN, (double) bytes, 'A');
unit_snprintf(nbuf, UNIT_LEN, (double) bytes / end_time, test->unit_format);
unit_snprintf(nbuf, UNIT_LEN, (double) bytes / end_time, test->default_settings->unit_format);
if(test->protocol == Ptcp)
{
@ -955,7 +1043,8 @@ struct iperf_stream *iperf_new_stream(struct iperf_test *testp)
return(NULL);
}
memset(sp, 0, sizeof(struct iperf_stream));
memset(sp, 0, sizeof(struct iperf_stream));
sp->settings = (struct iperf_settings *) malloc(sizeof(struct iperf_settings));
memcpy(sp->settings, testp->default_settings, sizeof(struct iperf_settings));
@ -1043,7 +1132,7 @@ int iperf_udp_accept(struct iperf_test *test)
sp->socket = test->listener_sock_udp;
//setting noblock doesn't report back to client
//setnonblocking( sp->socket);
setnonblocking( sp->socket);
iperf_init_stream(sp, test);
iperf_add_stream(test, sp);
@ -1060,7 +1149,7 @@ int iperf_udp_accept(struct iperf_test *test)
connect_msg(sp);
printf("1st UDP data packet for socket %d has arrived \n", sp->socket);
sp->stream_id = udp->stream_id;
sp->stream_id = udp->stream_id;
sp->result->bytes_received+= sz;
//OUT OF ORDER PACKETS
@ -1101,7 +1190,7 @@ int iperf_tcp_accept(struct iperf_test *test)
sp = test->new_stream(test);
//setting noblock doesn't report back to client
//setnonblocking(peersock);
setnonblocking(peersock);
FD_SET(peersock, &test->read_set);
test->max_fd = (test->max_fd < peersock) ? peersock : test->max_fd;
@ -1153,8 +1242,8 @@ void iperf_init_stream(struct iperf_stream *sp, struct iperf_test *testp)
x = getsock_tcp_windowsize(sp->socket, SO_SNDBUF);
if(x < 0)
perror("SO_SNDBUF");
// printf("SND: %d\n", x);
// printf("SND: %d\n", x);
set_socket_options(sp, testp);
}
@ -1217,17 +1306,18 @@ void iperf_run_server(struct iperf_test *test)
test->default_settings->state = TEST_RUNNING;
while(test->default_settings->state != TEST_END)
{
{
FD_COPY(&test->read_set, &test->temp_set);
tv.tv_sec = 50;
tv.tv_sec = 15;
tv.tv_usec = 0;
// using select to check on multiple descriptors.
result = select(test->max_fd + 1, &test->temp_set, NULL, NULL, &tv);
if (result == 0)
if (result == 0)
printf("SERVER IDLE : %d sec\n", (int)tv.tv_sec);
else if (result < 0 && errno != EINTR)
{
printf("Error in select(): %s\n", strerror(errno));
@ -1239,10 +1329,9 @@ void iperf_run_server(struct iperf_test *test)
// Accept a new TCP connection
if (FD_ISSET(test->listener_sock_tcp, &test->temp_set))
{
test->protocol = Ptcp;
test->default_settings->blksize = DEFAULT_TCP_BLKSIZE;
test->protocol = Ptcp;
test->accept = iperf_tcp_accept;
test->new_stream = iperf_new_tcp_stream;
test->new_stream = iperf_new_tcp_stream;
test->accept(test);
test->default_settings->state = TEST_RUNNING;
FD_CLR(test->listener_sock_tcp, &test->temp_set);
@ -1251,11 +1340,10 @@ void iperf_run_server(struct iperf_test *test)
// Accept a new UDP connection
else if (FD_ISSET(test->listener_sock_udp, &test->temp_set) )
{
test->protocol = Pudp;
test->default_settings->blksize = DEFAULT_UDP_BLKSIZE;
test->protocol = Pudp;
test->accept = iperf_udp_accept;
test->new_stream = iperf_new_udp_stream;
test->accept(test);
test->accept(test);
test->default_settings->state = TEST_RUNNING;
FD_CLR(test->listener_sock_udp, &test->temp_set);
}
@ -1269,6 +1357,17 @@ void iperf_run_server(struct iperf_test *test)
// find the correct stream
np = find_stream_by_socket(test,j);
message = np->rcv(np);
if(message == PARAM_EXCHANGE || message == -1)
{
//copy the received settings into test
if(message != -1)
memcpy(test->default_settings, test->streams->settings, sizeof(struct iperf_settings));
// FREE ALL STREAMS
close(np->socket);
FD_CLR(np->socket, &test->read_set);
iperf_free_stream(test, np);
}
if(message == STREAM_END)
{
@ -1341,7 +1440,7 @@ void iperf_run_client(struct iperf_test *test)
int64_t delayus, adjustus, dtargus, remaining, min;
struct timeval tv;
int ret=0;
tv.tv_sec = 15; // timeout interval in seconds
tv.tv_usec = 0;
@ -1364,7 +1463,7 @@ void iperf_run_client(struct iperf_test *test)
sp->send_timer = new_timer(0, dtargus);
sp= sp->next;
}
}
}
timer = new_timer(test->duration, 0);
@ -1378,7 +1477,7 @@ void iperf_run_client(struct iperf_test *test)
while(!timer->expired(timer))
{
FD_COPY(&test->write_set, &test->temp_set);
ret = select(test->max_fd+1, NULL, &test->temp_set, NULL, &tv);
ret = select(test->max_fd+1, NULL, &test->write_set, NULL, &tv);
if(ret < 0)
continue;
@ -1388,7 +1487,7 @@ void iperf_run_client(struct iperf_test *test)
if(FD_ISSET(sp->socket, &test->temp_set))
{
result+= sp->snd(sp);
if(sp->next==NULL)
break;
sp=sp->next;
@ -1440,6 +1539,7 @@ void iperf_run_client(struct iperf_test *test)
{
sp = np;
sp->settings->state = STREAM_END;
printf("sp blksize = %d \n", sp->settings->blksize);
sp->snd(sp);
np = sp->next;
} while (np);
@ -1463,8 +1563,10 @@ void iperf_run_client(struct iperf_test *test)
// Requesting for result from Server
receive_result_from_server(test);
free_timer(stats_interval);
free_timer(reporter_interval);
if(test->stats_interval!= 0)
free_timer(stats_interval);
if(test->reporter_interval!= 0)
free_timer(reporter_interval);
free_timer(timer);
}
@ -1493,6 +1595,7 @@ main(int argc, char **argv)
{
char ch, role;
struct iperf_test *test;
int port= -1;
test = iperf_new_test();
iperf_defaults(test);
@ -1506,14 +1609,16 @@ main(int argc, char **argv)
strncpy(test->server_hostname, optarg, strlen(optarg));
break;
case 'p':
test->server_port = atoi(optarg);
test->server_port = atoi(optarg);
port = test->server_port;
break;
case 's':
test->role = 's';
role = test->role;
break;
case 't':
test->duration = atoi(optarg);
test->duration = atoi(optarg);
break;
case 'u':
test->protocol = Pudp;
@ -1527,7 +1632,7 @@ main(int argc, char **argv)
break;
case 'l':
test->default_settings->blksize = unit_atoi(optarg);
printf("%lld is the blksize\n", unit_atoi(optarg));
printf("%d is the blksize\n", test->default_settings->blksize);
break;
case 'w':
test->default_settings->socket_bufsize = unit_atof(optarg);
@ -1545,8 +1650,7 @@ main(int argc, char **argv)
test->default_settings->mss = atoi(optarg);
break;
case 'f':
test->unit_format = *optarg;
printf("%c is format \n", test->unit_format);
test->default_settings->unit_format = *optarg;
break;
}
@ -1564,8 +1668,13 @@ main(int argc, char **argv)
break;
}
// param exchange
if(test->role == 'c')
exchange_parameters(test);
iperf_init_test(test);
// actual test begins
iperf_init_test(test);
fflush(stdout);
iperf_run(test);
iperf_free_test(test);
@ -1589,7 +1698,10 @@ main(int argc, char **argv)
case Pudp:
test->new_stream = iperf_new_udp_stream;
break;
}
}
if(port != -1)
test->server_port = port;
iperf_init_test(test);
iperf_run(test);

View File

@ -31,7 +31,9 @@ struct iperf_settings
int mss; //for TCP MSS
int ttl;
int tos;
int state; // This is state of a stream/test - can use Union for this
char unit_format; // -f
int state; // This is state of a stream/test
uuid_t cookie; // cookie for a stream/test
};
struct iperf_stream
@ -45,7 +47,7 @@ struct iperf_stream
/* non configurable members */
struct iperf_stream_result *result; //structure pointer to result
int socket; // socket
struct timer *send_timer;
struct timer *send_timer;
/* for udp measurements - This can be a structure outside stream,
and stream can have a pointer to this */
@ -77,17 +79,16 @@ struct iperf_test
int server_port; // arg of -p
int duration; // total duration of test -t
int listener_sock_tcp;
int listener_sock_udp;
int listener_sock_udp;
/*boolen variables for Options */
int daemon; // -D
int no_delay; // -N
int print_mss; // -m
int domain; // -V
char unit_format; // -f
int domain; // -V
/* Select related parameters */
int max_fd;
int max_fd;
fd_set read_set;
fd_set temp_set;
fd_set write_set;
@ -107,9 +108,8 @@ struct iperf_test
struct iperf_settings *default_settings;
};
struct udp_datagram
{
{
int state;
int stream_id;
int packet_count;
@ -117,13 +117,20 @@ struct udp_datagram
};
struct tcp_datagram
struct param_exchange
{
uuid_t cookie;
int state;
int stream_id;
int blksize;
int recv_window;
int send_window;
int mss;
char format;
};
void exchange_parameters(struct iperf_test *test);
void add_interval_list(struct iperf_stream_result *rp, struct iperf_interval_results temp);
void display_interval_list(struct iperf_stream_result *rp);
void send_result_to_client(struct iperf_stream *sp);
@ -147,8 +154,7 @@ int iperf_run(struct iperf_test *test);
enum {
Ptcp = SOCK_STREAM,
Pudp = SOCK_DGRAM,
Pudp = SOCK_DGRAM,
uS_TO_NS = 1000,
RATE = 1000000,
MAX_BUFFER_SIZE =10,
@ -163,6 +169,7 @@ enum {
STREAM_RUNNING = 7,
STREAM_END = 8,
ALL_STREAMS_END = 9,
PARAM_EXCHANGE = 10,
SEC_TO_US = 1000000
};