lots more cleanup. Seems to work for single stream TCP now. UDP broke. Parallel TCP not tested

This commit is contained in:
Brian Tierney 2009-10-23 01:30:55 +00:00
parent a725333452
commit 6f5723846d
4 changed files with 201 additions and 196 deletions

View File

@ -43,7 +43,7 @@ all_data_sent(struct iperf_test * test)
return 0;
else
{
int total_bytes = 0;
uint64_t total_bytes = 0;
struct iperf_stream *sp;
sp = test->streams;
@ -68,21 +68,11 @@ void
exchange_parameters(struct iperf_test * test)
{
int result;
struct iperf_test *temp;
struct iperf_stream *sp;
struct param_exchange *param;
temp = iperf_new_test();
iperf_defaults(temp);
temp->role = 'c';
temp->server_hostname = test->server_hostname;
temp->server_port = test->server_port;
printf("in exchange_parameters: calling iperf_init_test \n");
iperf_init_test(temp); /* XXX: This is way overkill for just
* exchaning parameters. Does way more than
* needed */
sp = temp->streams;
//printf("in exchange_parameters \n");
sp = test->streams;
sp->settings->state = PARAM_EXCHANGE;
param = (struct param_exchange *) sp->buffer;
@ -96,31 +86,32 @@ exchange_parameters(struct iperf_test * test)
param->send_window = test->default_settings->socket_bufsize;
param->format = test->default_settings->unit_format;
printf(" sending exchange params: size = %d \n", (int) sizeof(struct param_exchange));
//printf(" sending exchange params: size = %d \n", (int) sizeof(struct param_exchange));
/* XXX: cant we use iperf_tcp_send for this? that would be cleaner */
result = send(sp->socket, sp->buffer, sizeof(struct param_exchange), 0);
if (result < 0)
perror("Error sending exchange params to server");
/* XXX: no error checking! -blt */
printf("result = %d state = %d, error = %d \n", result, sp->buffer[0], errno);
//printf("result = %d state = %d, error = %d \n", result, sp->buffer[0], errno);
/* get answer back from server */
do
{
printf("reading result from server .. \n");
//printf("exchange_parameters: reading result from server .. \n");
result = recv(sp->socket, sp->buffer, sizeof(struct param_exchange), 0);
} while (result == -1 && errno == EINTR);
if (result < 0)
perror("Error getting exchange params ack from server");
if (result > 0 && sp->buffer[0] == ACCESS_DENIED)
{
fprintf(stderr, "Busy server Detected. Exiting.\n");
close(sp->socket);
iperf_free_stream(temp, sp);
iperf_free_test(temp);
exit(0);
exit(-1);
}
close(sp->socket);
iperf_free_stream(temp, sp);
iperf_free_test(temp);
return;
}
/*********************************************************************/
@ -128,35 +119,32 @@ int
param_received(struct iperf_stream * sp, struct param_exchange * param)
{
int result;
/* XXX: only sends back a message on failure ?? -blt */
char *buf = (char *) malloc(sizeof(struct param_exchange));
printf("in param_received \n");
if (sp->settings->cookie[0] == '\0')
{
strncpy(sp->settings->cookie, param->cookie, 37);
sp->settings->blksize = param->blksize;
sp->settings->socket_bufsize = param->recv_window;
sp->settings->unit_format = param->format;
return param->state;
printf("Got params from client: block size = %d, recv_window = %d \n",
sp->settings->blksize, sp->settings->socket_bufsize);
param->state = TEST_START;
buf[0] = TEST_START;
} else if (strncmp(param->cookie, sp->settings->cookie, 37) != 0)
{
fprintf(stderr, "New connection denied\n");
/* send NO to client */
char *buf = (char *) malloc(sizeof(struct param_exchange));
buf[0] = -1;
/*
* XXX: only first byte matters, I think, so why not just alloc and
* send 1 byte?
*/
result = send(sp->socket, buf, sizeof(struct param_exchange), 0);
/* XXX: no error checking! -blt */
free(buf);
return ACCESS_DENIED;
param->state = ACCESS_DENIED;
buf[0] = ACCESS_DENIED;
}
free(buf);
result = send(sp->socket, buf, sizeof(struct param_exchange), 0);
if (result < 0)
perror("Error sending param ack to client");
return param->state;
}
@ -186,6 +174,7 @@ add_interval_list(struct iperf_stream_result * rp, struct iperf_interval_results
#if defined(linux) || defined(__FreeBSD__)
ip->tcpInfo = temp.tcpInfo;
#endif
printf("add_interval_list: Mbytes = %d, duration = %f \n", (int)ip->bytes_transferred/1000000, ip->interval_duration);
if (!rp->interval_results)
{
@ -251,39 +240,31 @@ send_result_to_client(struct iperf_stream * sp)
free(buf);
}
/************************************************************/
void
receive_result_from_server(struct iperf_test * test)
{
int result;
struct iperf_test *temp;
struct iperf_stream *sp;
int size = 0;
char *buf = NULL;
temp = iperf_new_test();
iperf_defaults(temp);
temp->role = 'c';
temp->default_settings->blksize = test->default_settings->blksize;
temp->server_hostname = test->server_hostname;
temp->server_port = test->server_port;
strncpy(temp->default_settings->cookie, test->default_settings->cookie, 37);
printf("in receive_result_from_server: calling iperf_init_test \n");
iperf_init_test(temp); /* XXX: should this be here??? I think
* sockets are already set up? -blt */
sp = temp->streams;
sp->settings->state = ALL_STREAMS_END;
sp->snd(sp); /* send message to client */
sp->settings->state = RESULT_REQUEST;
sp->snd(sp); /* send message to client */
/* receive from server */
printf("in receive_result_from_server \n");
size = sp->settings->blksize; /* XXX: Is blksize really what we
* want to use for the results? */
buf = (char *) malloc(size);
sp = test->streams;
printf("receive_result_from_server: send ALL_STREAMS_END to server \n");
sp->settings->state = ALL_STREAMS_END;
sp->snd(sp); /* send message to server */
printf("receive_result_from_server: send RESULT_REQUEST to server \n");
sp->settings->state = RESULT_REQUEST;
sp->snd(sp); /* send message to server */
/* receive from server */
do
{
@ -294,8 +275,6 @@ receive_result_from_server(struct iperf_test * test)
printf(server_reporting, sp->socket);
puts(buf); /* prints results */
iperf_free_stream(temp, sp);
iperf_free_test(temp);
}
/*************************************************************************/
@ -431,10 +410,13 @@ iperf_tcp_recv(struct iperf_stream * sp)
/* get the 1st byte: then based on that, decide how much to read */
if ((result = recv(sp->socket, &ch, sizeof(int), MSG_PEEK)) != sizeof(int))
{
perror("recv error");
perror("iperf_tcp_recv: recv error: MSG_PEEK");
return -1;
}
message = (int) ch;
if( message != 7) /* tell me about non STREAM_RUNNING messages for debugging */
printf("iperf_tcp_recv: got message type %d \n", message);
switch (message)
{
case PARAM_EXCHANGE:
@ -443,15 +425,15 @@ iperf_tcp_recv(struct iperf_stream * sp)
size = sizeof(struct param_exchange);
do
{ /* XXX: is the do/while really needed? */
printf("iperf_tcp_recv: Calling recv: expecting %d bytes \n", size);
//printf("iperf_tcp_recv: Calling recv: expecting %d bytes \n", size);
result = recv(sp->socket, sp->buffer, size, MSG_WAITALL);
} while (result == -1 && errno == EINTR);
if (result == -1)
if (result == -1)
{
perror("recv error");
return -1;
perror("iperf_tcp_recv: recv error");
return -1;
}
printf("iperf_tcp_recv: recv returned %d bytes \n", result);
//printf("iperf_tcp_recv: recv returned %d bytes \n", result);
//printf("result = %d state = %d, %d = error\n", result, sp->buffer[0], errno);
message = param_received(sp, param);
@ -470,12 +452,13 @@ iperf_tcp_recv(struct iperf_stream * sp)
} while (result == -1 && errno == EINTR);
#else
result = Nread(sp->socket, sp->buffer, size, 0); /* XXX: TCP only for the momment! */
result = Nread(sp->socket, sp->buffer, size, 0); /* XXX: TCP only for the
* momment! */
#endif
if (result == -1)
if (result == -1)
{
perror("Read error");
return -1;
perror("Read error");
return -1;
}
//printf("iperf_tcp_recv: recv returned %d bytes \n", result);
sp->result->bytes_received += result;
@ -572,61 +555,66 @@ iperf_tcp_send(struct iperf_stream * sp)
perror("transmit buffer not allocated");
return -1;
}
strncpy(param->cookie, sp->settings->cookie, 37);
switch (sp->settings->state)
{
case PARAM_EXCHANGE:
param->state = PARAM_EXCHANGE;
strncpy(param->cookie, sp->settings->cookie, 37);
size = sizeof(struct param_exchange);
size = sizeof(struct param_exchange);
break;
case STREAM_BEGIN:
param->state = STREAM_BEGIN;
strncpy(param->cookie, sp->settings->cookie, 37);
size = sp->settings->blksize;
size = sp->settings->blksize;
break;
case STREAM_END:
param->state = STREAM_END;
strncpy(param->cookie, sp->settings->cookie, 37);
size = sp->settings->blksize; /* XXX: this might not be right, will the last block always be full size? */
size = sp->settings->blksize; /* XXX: this might not be right, will
* the last block always be full
* size? */
break;
case RESULT_REQUEST:
param->state = RESULT_REQUEST;
strncpy(param->cookie, sp->settings->cookie, 37);
size = sizeof(struct param_exchange);
size = sizeof(struct param_exchange);
break;
case ALL_STREAMS_END:
param->state = ALL_STREAMS_END;
strncpy(param->cookie, sp->settings->cookie, 37);
size = sizeof(struct param_exchange);
size = sizeof(struct param_exchange);
break;
case STREAM_RUNNING:
sp->buffer[0] = STREAM_RUNNING;
size = sp->settings->blksize;
param->state = STREAM_RUNNING;
size = sp->settings->blksize;
break;
default:
printf("State of the stream can't be determined\n");
break;
}
//printf("in iperf_tcp_send, writting %d byptes \n", size);
//printf(" in iperf_tcp_send, message type = %d (total = %d bytes) \n", param->state, size);
#ifdef USE_SEND
result = send(sp->socket, sp->buffer, size, 0);
#else
result = Nwrite(sp->socket, sp->buffer, size, 0, dest); /* XXX: TCP only at the moment! */
result = Nwrite(sp->socket, sp->buffer, size, 0, dest); /* XXX: TCP only at the
* moment! */
#endif
if (result < 0)
perror("Write error");
//printf(" iperf_tcp_send: %d bytes sent \n", result);
/* change state after 1st send */
if (sp->settings->state == STREAM_BEGIN)
sp->settings->state = STREAM_RUNNING;
if (sp->buffer[0] != STREAM_END)
/* XXX: check/fix this. Maybe only want STREAM_BEGIN and STREAM_RUNNING */
sp->result->bytes_sent += size;
//printf("number bytes sent so far = %d \n", (int)sp->result->bytes_sent);
return result;
}
@ -802,8 +790,8 @@ iperf_init_test(struct iperf_test * test)
perror("unable to set TCP window");
}
/* XXX: why do we do this?? -blt */
setnonblocking(test->listener_sock_tcp);
setnonblocking(test->listener_sock_udp);
//setnonblocking(test->listener_sock_tcp);
//setnonblocking(test->listener_sock_udp);
printf("-----------------------------------------------------------\n");
printf("Server listening on %d\n", test->server_port);
@ -846,7 +834,6 @@ iperf_init_test(struct iperf_test * test)
sp = test->new_stream(test);
sp->socket = s;
printf("in iperf_init_test: tcp_windowsize: %d \n", test->default_settings->socket_bufsize);
iperf_init_stream(sp, test);
iperf_add_stream(test, sp);
@ -888,6 +875,7 @@ iperf_stats_callback(struct iperf_test * test)
struct iperf_stream_result *rp = test->streams->result;
struct iperf_interval_results *ip, temp;
//printf("in stats_callback: num_streams = %d \n", test->num_streams);
for (i = 0; i < test->num_streams; i++)
{
rp = sp->result;
@ -968,7 +956,7 @@ iperf_reporter_callback(struct iperf_test * test)
}
char *message_final = (char *) malloc((count + 1) * (strlen(report_bw_jitter_loss_header)
+ strlen(report_bw_jitter_loss_format) + strlen(report_sum_bw_jitter_loss_format)));
+ strlen(report_bw_jitter_loss_format) + strlen(report_sum_bw_jitter_loss_format)));
memset(message_final, 0, strlen(message_final));
@ -993,7 +981,8 @@ iperf_reporter_callback(struct iperf_test * test)
if (test->streams->result->interval_results->next != NULL)
{
unit_snprintf(nbuf, UNIT_LEN, (double) (ip->bytes_transferred / (ip->interval_duration - ip_prev->interval_duration)),
unit_snprintf(nbuf, UNIT_LEN,
(double) (ip->bytes_transferred / (ip->interval_duration - ip_prev->interval_duration)),
test->default_settings->unit_format);
sprintf(message, report_bw_format, sp->socket, ip_prev->interval_duration, ip->interval_duration, ubuf, nbuf);
@ -1178,12 +1167,16 @@ iperf_new_stream(struct iperf_test * testp)
}
memset(sp, 0, sizeof(struct iperf_stream));
printf("Creating send / recieve buffer of size %d \n", testp->default_settings->blksize);
sp->buffer = (char *) malloc(testp->default_settings->blksize);
sp->settings = (struct iperf_settings *) malloc(sizeof(struct iperf_settings));
memcpy(sp->settings, testp->default_settings, sizeof(struct iperf_settings));
sp->result = (struct iperf_stream_result *) malloc(sizeof(struct iperf_stream_result));
/* fill in buffer with random stuff */
/* XXX: probably better to use truely random stuff here */
memset_pattern16( sp->buffer, "abcdefghijklmnop", (size_t) testp->default_settings->blksize);
sp->socket = -1;
sp->packet_count = 0;
@ -1364,7 +1357,7 @@ iperf_init_stream(struct iperf_stream * sp, struct iperf_test * testp)
{
perror("getpeername");
}
printf("in init_stream: calling set_tcp_windowsize: %d \n", testp->default_settings->socket_bufsize);
//printf("in init_stream: calling set_tcp_windowsize: %d \n", testp->default_settings->socket_bufsize);
if (set_tcp_windowsize(sp->socket, testp->default_settings->socket_bufsize,
testp->role == 's' ? SO_RCVBUF : SO_SNDBUF) < 0)
fprintf(stderr, "unable to set window size\n");
@ -1551,8 +1544,8 @@ iperf_run_server(struct iperf_test * test)
}
void
/**************************************************************************/
void
catcher(int sig)
{
longjmp(env, sig);
@ -1609,15 +1602,25 @@ iperf_run_client(struct iperf_test * test)
if (test->reporter_interval != 0)
reporter_interval = new_timer(test->reporter_interval, 0);
if (test->default_settings->bytes == 0)
printf("Starting Test: %d streams, %d byte blocks, %d second test \n",
test->num_streams, test->default_settings->blksize, test->duration);
else
printf("Starting Test: %d streams, %d byte blocks, %d bytes to send\n",
test->num_streams, test->default_settings->blksize, (int) test->default_settings->bytes);
/* send data till the timer expires or bytes sent */
while (!all_data_sent(test) && !timer->expired(timer))
{
memcpy(&test->temp_set, &test->write_set, sizeof(test->write_set)); /* XXX: Does this need
* to be inside the
* while loop? -blt */
#ifdef NEED_THIS /* not sure what this was for, so removed -blt */
memcpy(&test->temp_set, &test->write_set, sizeof(test->write_set));
printf("Calling select... \n");
ret = select(test->max_fd + 1, NULL, &test->write_set, NULL, &tv);
if (ret < 0)
continue;
#endif
sp = test->streams;
for (i = 0; i < test->num_streams; i++)
@ -1646,10 +1649,12 @@ iperf_run_client(struct iperf_test * test)
break;
} /* while outer timer */
printf("Test Complete. \n");
/* for last interval */
test->stats_callback(test);
read = test->reporter_callback(test);
printf("Client measured results: \n");
puts(read);
/* sending STREAM_END packets */
@ -1662,13 +1667,16 @@ iperf_run_client(struct iperf_test * test)
sp->snd(sp);
np = sp->next;
} while (np);
test->default_settings->state = RESULT_REQUEST;
read = test->reporter_callback(test);
puts(read);
//printf("Done Sending STREAM_END. \n");
/* Requesting for result from Server */
receive_result_from_server(test);
test->default_settings->state = RESULT_REQUEST;
// receive_result_from_server(test); /* XXX: current broken? bus error */
read = test->reporter_callback(test);
printf("Server measured results: \n");
puts(read);
printf("Done getting/printing results. \n");
/* Deleting all streams - CAN CHANGE FREE_STREAM FN */
sp = test->streams;
@ -1708,6 +1716,7 @@ iperf_run(struct iperf_test * test)
return -1;
break;
}
printf("Done iperf_run. \n");
}
@ -1734,7 +1743,6 @@ static struct option longopts[] =
{NULL, 0, NULL, 0}
};
/**************************************************************************/
/**************************************************************************/
int
@ -1777,98 +1785,94 @@ main(int argc, char **argv)
err("couldn't change CPU affinity");
#endif
while (1) /* XXX: should while 1 be server only?? -blt */
test = iperf_new_test();
iperf_defaults(test); /* sets defaults */
while ((ch = getopt_long(argc, argv, "c:p:st:uP:b:l:w:i:n:mNThM:f:", longopts, NULL)) != -1)
{
test = iperf_new_test();
iperf_defaults(test);
while ((ch = getopt_long(argc, argv, "c:p:st:uP:b:l:w:i:n:mNThM:f:", longopts, NULL)) != -1)
switch (ch)
{
switch (ch)
{
case 'c':
test->role = 'c';
role = test->role;
test->server_hostname = (char *) malloc(strlen(optarg));
strncpy(test->server_hostname, optarg, strlen(optarg));
break;
case 'p':
test->server_port = atoi(optarg);
port = test->server_port;
break;
case 's':
test->role = 's';
role = test->role;
break;
case 't':
test->duration = atoi(optarg);
break;
case 'u':
test->protocol = Pudp;
test->default_settings->blksize = DEFAULT_UDP_BLKSIZE;
test->new_stream = iperf_new_udp_stream;
break;
case 'P':
test->num_streams = atoi(optarg);
break;
case 'b':
test->default_settings->rate = unit_atof(optarg);
break;
case 'l':
test->default_settings->blksize = unit_atoi(optarg);
printf("%d is the blksize\n", test->default_settings->blksize);
break;
case 'w':
test->default_settings->socket_bufsize = unit_atof(optarg);
break;
case 'i':
test->stats_interval = atoi(optarg);
test->reporter_interval = atoi(optarg);
break;
case 'n':
test->default_settings->bytes = unit_atoi(optarg);
printf("total bytes to be transferred = %llu\n", test->default_settings->bytes);
break;
case 'm':
test->print_mss = 1;
break;
case 'N':
test->no_delay = 1;
break;
case 'M':
test->default_settings->mss = atoi(optarg);
break;
case 'f':
test->default_settings->unit_format = *optarg;
break;
case 'T':
test->tcp_info = 1;
break;
case 'h':
default:
fprintf(stderr, usage_long1);
fprintf(stderr, usage_long2);
exit(1);
}
}
/* param exchange */
if (test->role == 'c')
exchange_parameters(test);
/* used for subsequent runs of server */
if (test->role == 's');
test->server_port = port;
printf("in main: calling iperf_init_test \n");
iperf_init_test(test);
printf("in main: calling iperf_run \n");
iperf_run(test);
iperf_free_test(test);
if (role == 'c')
case 'c':
test->role = 'c';
role = test->role;
test->server_hostname = (char *) malloc(strlen(optarg));
strncpy(test->server_hostname, optarg, strlen(optarg));
break;
case 'p':
test->server_port = atoi(optarg);
port = test->server_port;
break;
case 's':
test->role = 's';
role = test->role;
break;
case 't':
test->duration = atoi(optarg);
break;
case 'u':
test->protocol = Pudp;
test->default_settings->blksize = DEFAULT_UDP_BLKSIZE;
test->new_stream = iperf_new_udp_stream;
break;
case 'P':
test->num_streams = atoi(optarg);
break;
case 'b':
test->default_settings->rate = unit_atof(optarg);
break;
case 'l':
test->default_settings->blksize = unit_atoi(optarg);
printf("%d is the blksize\n", test->default_settings->blksize);
break;
case 'w':
test->default_settings->socket_bufsize = unit_atof(optarg);
break;
case 'i':
test->stats_interval = atoi(optarg);
test->reporter_interval = atoi(optarg);
break;
case 'n':
test->default_settings->bytes = unit_atoi(optarg);
printf("total bytes to be transferred = %llu\n", test->default_settings->bytes);
break;
case 'm':
test->print_mss = 1;
break;
case 'N':
test->no_delay = 1;
break;
case 'M':
test->default_settings->mss = atoi(optarg);
break;
case 'f':
test->default_settings->unit_format = *optarg;
break;
case 'T':
test->tcp_info = 1;
break;
case 'h':
default:
fprintf(stderr, usage_long1);
fprintf(stderr, usage_long2);
exit(1);
}
}
return 0;
//printf("in main: calling iperf_init_test \n");
printf("Connection to port %d on host %s \n", test->server_port, test->server_hostname);
iperf_init_test(test);
if (test->role == 'c') /* if client, send params to server */
{
exchange_parameters(test);
test->streams->settings->state = STREAM_BEGIN;
}
printf("in main: calling iperf_run \n");
iperf_run(test);
iperf_free_test(test);
printf("\niperf Done.\n");
exit(0);
}

View File

@ -149,7 +149,7 @@ enum
uS_TO_NS = 1000,
SEC_TO_US = 1000000,
RATE = 1024 * 1024, /* 1 Mbps */
DURATION = 10, /* seconds */
DURATION = 5, /* seconds */
DEFAULT_UDP_BLKSIZE = 1450, /* 1 packet per ethernet frame, IPV6 too */
DEFAULT_TCP_BLKSIZE = 128 * 1024, /* default read/write block size */

View File

@ -125,8 +125,10 @@ again:
}
} else
{
//printf("Nwrite: writing %d bytes to socket %d \n", count, fd);
cnt = write(fd, buf, count);
}
//printf("Nwrite: wrote %d bytes \n", cnt);
return (cnt);
}

View File

@ -49,7 +49,6 @@ timer_expired(struct timer * tp)
current += now.tv_usec;
diff = end - current;
return diff <= 0;
/*