Note: The redesign is almost working. Should be finished shortly.

This commit is contained in:
sethdelliott 2010-06-21 15:08:47 +00:00
parent 982c704a8a
commit ba2672a209
4 changed files with 154 additions and 132 deletions

View File

@ -170,6 +170,7 @@ struct param_exchange
int state;
int protocol;
int stream_id;
int num_streams;
int blksize;
int recv_window;
int send_window;

View File

@ -94,15 +94,16 @@ iperf_exchange_parameters(struct iperf_test * test)
// XXX: Probably should get the cookie at the start of iperf rather than
// waiting till here
get_uuid(test->default_settings->cookie);
strncpy(param->cookie, test->default_settings->cookie, COOKIE_SIZE);
strncpy(param.cookie, test->default_settings->cookie, COOKIE_SIZE);
/* setting up exchange parameters */
param->state = PARAM_EXCHANGE;
param->protocol = test->protocol;
param->blksize = test->default_settings->blksize;
param->recv_window = test->default_settings->socket_bufsize;
param->send_window = test->default_settings->socket_bufsize;
param->format = test->default_settings->unit_format;
param.state = PARAM_EXCHANGE;
param.protocol = test->protocol;
param.num_streams = test->num_streams;
param.blksize = test->default_settings->blksize;
param.recv_window = test->default_settings->socket_bufsize;
param.send_window = test->default_settings->socket_bufsize;
param.format = test->default_settings->unit_format;
if (write(test->ctrl_sck, &param, sizeof(struct param_exchange)) < 0) {
perror("write param_exchange");
@ -119,22 +120,25 @@ iperf_exchange_parameters(struct iperf_test * test)
} else {
if (read(ctrl_sck, &param, sizeof(struct param_exchange)) < 0) {
if (read(test->ctrl_sck, &param, sizeof(struct param_exchange)) < 0) {
perror("read param_exchange");
return -1;
}
// set test parameters
test->default_settings->cookie = param->cookie;
test->protocol = param->protocol;
test->default_settings->blksize = param->blksize;
test->default_settings->socket_bufsize = param->recv_window;
strncpy(test->default_settings->cookie, param.cookie, COOKIE_SIZE);
test->protocol = param.protocol;
test->num_streams = param.num_streams;
test->default_settings->blksize = param.blksize;
test->default_settings->socket_bufsize = param.recv_window;
// need to add support for send_window
test->default_settings->unit_format = param->format;
test->default_settings->unit_format = param.format;
test->prot_listener = netannounce(test->protocol, NULL, 5202);
// Send the control message to create streams and start the test
test->state = CREATE_STREAMS;
if (write(ctrl_sck, &test->state, sizeof(int)) < 0) {
if (write(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
perror("write CREATE_STREAMS");
return -1;
}
@ -346,7 +350,7 @@ iperf_create_streams(struct iperf_test *test)
int i, s;
for (i = 0; i < test->num_streams; ++i) {
s = netdial(test->protocol, test->server_hostname, test->server_port);
s = netdial(test->protocol, test->server_hostname, test->server_port+1);
if (s < 0) {
perror("netdial stream");
return -1;
@ -381,6 +385,7 @@ iperf_handle_message_client(struct iperf_test *test)
iperf_create_streams(test);
break;
default:
// XXX: This needs to be replaced
printf("How did you get here? test->state = %d\n", test->state);
break;
}
@ -851,7 +856,7 @@ iperf_client_start(struct iperf_test *test)
for (sp = test->streams; sp != NULL; sp = sp->next) {
if (sp->snd(sp) < 0) {
perror("iperf_client_start: snd");
// do other stuff on error
// XXX: needs to indicate an iperf error on stream send
}
}
@ -876,6 +881,13 @@ iperf_client_start(struct iperf_test *test)
free(stats_interval);
free(reporter_interval);
// Send TEST_DONE (ALL_STREAMS_END) message
test->state = ALL_STREAMS_END;
if (write(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
perror("write ALL_STREAMS_END");
return -1;
}
return 0;
}
@ -937,24 +949,53 @@ iperf_client_end(struct iperf_test *test)
int
iperf_run_client(struct iperf_test * test)
{
struct timeval tv;
struct iperf_stream *sp;
fd_set temp_set;
int result;
/* Start the client and connect to the server */
if (iperf_connect(test) < 0) {
// set error and return
return -1;
}
while (test->state != TEST_END) {
FD_COPY(&test->read_set, &temp_set);
tv.tv_sec = 15;
tv.tv_usec = 0;
result = select(test->max_fd + 1, &temp_set, NULL, NULL, &tv);
if (result < 0 && errno != EINTR) {
perror("select");
exit(1);
} else if (result > 0) {
if (FD_ISSET(test->ctrl_sck, &temp_set)) {
iperf_handle_message_client(test);
FD_CLR(test->ctrl_sck, &temp_set);
}
}
}
/* Exchange parameters with the server */
/* Moved to iperf_connect
if (iperf_exchange_parameters(test) < 0) {
// This needs to set error
return -1;
}
*/
/* Start the iperf test */
/* Moved to while above
if (iperf_client_start(test) < 0) {
return -1;
}
*/
/* End the iperf test and clean up client specific memory */
if (iperf_client_end(test) < 0) {
return -1;
}

View File

@ -160,13 +160,16 @@ iperf_server_listen(struct iperf_test *test)
// This needs to be changed to reflect if client has different window size
// make sure we got what we asked for
/* XXX: This needs to be moved to the stream listener
if ((x = get_tcp_windowsize(test->listener_sock_tcp, SO_RCVBUF)) < 0) {
// Needs to set some sort of error number/message
perror("SO_RCVBUF");
return -1;
}
*/
// This code needs to be moved to after parameter exhange
// XXX: This code needs to be moved to after parameter exhange
// XXX: Last thing I was working on
if (test->protocol == Ptcp) {
if (test->default_settings->socket_bufsize > 0) {
unit_snprintf(ubuf, UNIT_LEN, (double) x, 'A');
@ -199,6 +202,7 @@ iperf_accept(struct iperf_test *test)
return -1;
}
/*
inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) & addr.local_addr)->sin_addr),
(void *) ipl, sizeof(ipl));
inet_ntop(AF_INET, (void *) (&((struct sockaddr_in *) & addr.remote_addr)->sin_addr),
@ -206,6 +210,8 @@ iperf_accept(struct iperf_test *test)
printf(report_peer, s,
ipl, ntohs(((struct sockaddr_in *) & addr.local_addr)->sin_port),
ipr, ntohs(((struct sockaddr_in *) & addr.remote_addr)->sin_port));
*/
printf("just accepted a control connection from somebody.. need to change this message\n");
return s;
} else {
@ -220,7 +226,7 @@ int
iperf_handle_message(struct iperf_test *test)
{
if (read(test->ctrl_sck, &test->state, sizeof(int)) < 0) {
// indicate error on read
// XXX: Needs to indicate read error
return -1;
}
@ -228,6 +234,16 @@ iperf_handle_message(struct iperf_test *test)
case PARAM_EXCHANGE:
iperf_exchange_parameters(test);
break;
case TEST_START:
break;
case ALL_STREAMS_END:
// close the streams
printf("made it to ALL_STREAMS_END!\n");
exit(1);
default:
// XXX: This needs to be replaced by actual error handling
fprintf("How did you get here? test->state = %d\n", test->state);
return -1;
}
return 0;
@ -256,7 +272,7 @@ iperf_run_server(struct iperf_test *test)
printf("iperf_run_server: Waiting for client connect.... \n");
while (test->default_settings != TEST_END) {
while (test->state != TEST_END) {
// Copy select set and renew timers
FD_COPY(&test->read_set, &test->temp_set);
@ -282,19 +298,19 @@ iperf_run_server(struct iperf_test *test)
}
if (FD_ISSET(test->ctrl_sck, &test->temp_set)) {
// Handle control messages
iperf_handle_message(test);
FD_CLR(test->ctrl_sck, &test->temp_set);
}
if (FD_ISSET(test->prot_listener, &test->temp_set)) {
// Spawn new streams
// XXX: Fix this!
iperf_tcp_accept(test);
FD_CLR(test->ctrl_sck, &test->temp_set);
}
// Iterate through the streams to see if their socket FD_ISSET
for (sp = test->streams; sp != NULL; sp = sp->next) {
if (FD_ISSET(sp->socket, &test->temp_set)) {
iperf_tcp_recv(sp);
}
}
}

View File

@ -52,111 +52,82 @@ jmp_buf env; /* to handle longjmp on signal */
int
iperf_tcp_recv(struct iperf_stream * sp)
{
int result = 0, message = 0;
int size = sp->settings->blksize;
char *final_message = NULL;
int result = 0, message = 0;
int size = sp->settings->blksize;
char *final_message = NULL;
errno = 0;
struct param_exchange *param = (struct param_exchange *) sp->buffer;
if (!sp->buffer)
{
fprintf(stderr, "receive buffer not allocated \n");
return -1;
if (!sp->buffer) {
fprintf(stderr, "receive buffer not allocated \n");
return -1;
}
/* get the 1st byte: then based on that, decide how much to read */
if ((result = recv(sp->socket, &message, sizeof(int), MSG_PEEK)) != sizeof(int))
{
if (result == 0)
printf("Client Disconnected. \n");
else
perror("iperf_tcp_recv: recv error: MSG_PEEK");
return -1;
if ((result = recv(sp->socket, &message, sizeof(int), MSG_PEEK)) != sizeof(int)) {
if (result == 0)
printf("Client Disconnected. \n");
else
perror("iperf_tcp_recv: recv error: MSG_PEEK");
return -1;
}
sp->settings->state = message;
#ifdef DEBUG
if (message != STREAM_RUNNING) /* tell me about non STREAM_RUNNING messages
* for debugging */
if (message != STREAM_RUNNING) // tell me about non STREAM_RUNNING messages
// for debugging
printf("iperf_tcp_recv: got message type %d \n", message);
#endif
switch (message)
{
case PARAM_EXCHANGE:
size = sizeof(struct param_exchange);
#ifdef USE_RECV
do
{
result = recv(sp->socket, sp->buffer, size, MSG_WAITALL);
} while (result == -1 && errno == EINTR);
#else
result = Nread(sp->socket, sp->buffer, size, Ptcp);
#endif
if (result == -1)
{
perror("iperf_tcp_recv: recv error");
return -1;
}
//printf("iperf_tcp_recv: recv returned %d bytes \n", result);
//printf("result = %d state = %d, %d = error\n", result, sp->buffer[0], errno);
result = param_received(sp, param); /* handle PARAM_EXCHANGE and
* send result to client */
break;
case TEST_START:
case STREAM_BEGIN:
case STREAM_RUNNING:
size = sp->settings->blksize;
switch (message) {
case TEST_START:
case STREAM_BEGIN:
case STREAM_RUNNING:
size = sp->settings->blksize;
#ifdef USE_RECV
/*
* NOTE: Nwrite/Nread seems to be 10-15% faster than send/recv for
* localhost on OSX. More testing needed on other OSes to be sure.
*/
do
{
//printf("iperf_tcp_recv: Calling recv: expecting %d bytes \n", size);
result = recv(sp->socket, sp->buffer, size, MSG_WAITALL);
} while (result == -1 && errno == EINTR);
do {
result = recv(sp->socket, sp->buffer, size, MSG_WAITALL);
} while (result == -1 && errno == EINTR);
#else
result = Nread(sp->socket, sp->buffer, size, Ptcp);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
#endif
if (result == -1)
{
perror("Read error");
return -1;
}
//printf("iperf_tcp_recv: recv on socket %d returned %d bytes \n", sp->socket, result);
sp->result->bytes_received += result;
sp->result->bytes_received_this_interval += result;
break;
case STREAM_END:
size = sizeof(struct param_exchange);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
break;
case ALL_STREAMS_END:
size = sizeof(struct param_exchange);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
break;
case TEST_END:
size = sizeof(struct param_exchange);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
break;
case RESULT_REQUEST:
/* XXX: not working yet */
//final_message = iperf_reporter_callback(test);
//memcpy(sp->buffer, final_message, strlen(final_message));
//result = send(sp->socket, sp->buffer, MAX_RESULT_STRING, 0);
if (result < 0)
perror("Error sending results back to client");
if (result == -1) {
perror("Read error");
return -1;
}
sp->result->bytes_received += result;
sp->result->bytes_received_this_interval += result;
break;
case STREAM_END: // What's the purpose of reading this?
size = sizeof(struct param_exchange);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
break;
case ALL_STREAMS_END:
size = sizeof(struct param_exchange);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
break;
case TEST_END:
size = sizeof(struct param_exchange);
result = Nread(sp->socket, sp->buffer, size, Ptcp);
break;
case RESULT_REQUEST:
/* XXX: not working yet */
//final_message = iperf_reporter_callback(test);
//memcpy(sp->buffer, final_message, strlen(final_message));
//result = send(sp->socket, sp->buffer, MAX_RESULT_STRING, 0);
//if (result < 0)
// perror("Error sending results back to client");
break;
default:
printf("unexpected state encountered: %d \n", message);
return -1;
break;
default:
printf("unexpected state encountered: %d \n", message);
return -1;
}
return message;
@ -186,9 +157,6 @@ iperf_tcp_send(struct iperf_stream * sp)
/* set read size based on message type */
switch (sp->settings->state) {
case PARAM_EXCHANGE:
size = sizeof(struct param_exchange);
break;
case STREAM_BEGIN:
size = sp->settings->blksize;
break;
@ -278,30 +246,26 @@ iperf_tcp_accept(struct iperf_test * test)
struct iperf_stream *sp;
len = sizeof(addr);
peersock = accept(test->listener_sock_tcp, (struct sockaddr *) & addr, &len);
if (peersock < 0)
{
printf("Error in accept(): %s\n", strerror(errno));
return -1;
} else
{
sp = test->new_stream(test);
setnonblocking(peersock);
FD_SET(peersock, &test->read_set); /* add new socket to master set */
test->max_fd = (test->max_fd < peersock) ? peersock : test->max_fd;
//printf("iperf_tcp_accept: max_fd now set to: %d \n", test->max_fd );
sp->socket = peersock;
//printf("in iperf_tcp_accept: socket = %d, tcp_windowsize: %d \n", peersock, test->default_settings->socket_bufsize);
iperf_init_stream(sp, test);
iperf_add_stream(test, sp);
if (test->default_settings->state != RESULT_REQUEST)
connect_msg(sp); /* print connect message */
return 0;
peersock = accept(test->prot_listener, (struct sockaddr *) & addr, &len);
if (peersock < 0) {
printf("Error in accept(): %s\n", strerror(errno));
return -1;
}
sp = test->new_stream(test);
setnonblocking(peersock);
FD_SET(peersock, &test->read_set); /* add new socket to master set */
test->max_fd = (test->max_fd < peersock) ? peersock : test->max_fd;
sp->socket = peersock;
iperf_init_stream(sp, test);
iperf_add_stream(test, sp);
if (test->default_settings->state != RESULT_REQUEST)
connect_msg(sp); /* print connect message */
return 0;
}