parallel streams now mostly working. Still some issues with repeated connections

This commit is contained in:
Brian Tierney 2009-11-06 07:25:10 +00:00
parent 150a09e9e2
commit a3281a3dc6
9 changed files with 115 additions and 89 deletions

View File

@ -58,7 +58,8 @@ struct iperf_stream
/* configurable members */
int local_port;
int remote_port;
/* XXX: I think settings should be one per test, not one per stream -blt */
/* XXX: is this just a pointer to the same struct in iperf_test? if not,
should it be? */
struct iperf_settings *settings; /* pointer to structure settings */
/* non configurable members */

View File

@ -449,7 +449,7 @@ iperf_stats_callback(struct iperf_test *test)
struct iperf_stream_result *rp = test->streams->result;
struct iperf_interval_results *ip, temp;
printf("in stats_callback: num_streams = %d \n", test->num_streams);
//printf("in stats_callback: num_streams = %d \n", test->num_streams);
for (i = 0; i < test->num_streams; i++) {
rp = sp->result;
@ -527,7 +527,7 @@ iperf_reporter_callback(struct iperf_test *test)
sp = test->streams;
curr_state = sp->settings->state;
printf("in iperf_reporter_callback: state = %d \n", curr_state);
//printf("in iperf_reporter_callback: state = %d \n", curr_state);
if (curr_state == TEST_RUNNING) {
/* print interval results */
@ -700,7 +700,7 @@ iperf_new_stream(struct iperf_test *testp)
int i = 0;
struct iperf_stream *sp;
printf("in iperf_new_stream \n");
//printf("in iperf_new_stream \n");
sp = (struct iperf_stream *)malloc(sizeof(struct iperf_stream));
if (!sp) {
perror("malloc");
@ -708,9 +708,10 @@ iperf_new_stream(struct iperf_test *testp)
}
memset(sp, 0, sizeof(struct iperf_stream));
printf("Allocating new stream buffer: size = %d \n", testp->default_settings->blksize);
printf("iperf_new_stream: Allocating new stream buffer: size = %d \n", testp->default_settings->blksize);
sp->buffer = (char *)malloc(testp->default_settings->blksize);
sp->settings = (struct iperf_settings *)malloc(sizeof(struct iperf_settings));
/* make a per stream copy of default_settings in each stream structure */
memcpy(sp->settings, testp->default_settings, sizeof(struct iperf_settings));
sp->result = (struct iperf_stream_result *)malloc(sizeof(struct iperf_stream_result));

View File

@ -16,14 +16,6 @@
*/
void exchange_parameters(struct iperf_test * test);
/**
* param_received - handles the param_Exchange part for server
* returns state on success, -1 on failure
*
*/
int param_received(struct iperf_stream * sp, struct param_exchange * param);
/**
* add_interval_list -- adds new interval to the interval_list
*

View File

@ -43,6 +43,8 @@
#include "uuid.h"
#include "locale.h"
void handle_message(struct iperf_test * test, int m, struct iperf_stream * sp);
/*********************************************************************/
/**
* param_received - handles the param_Exchange part for server
@ -119,9 +121,9 @@ void
iperf_run_server(struct iperf_test * test)
{
struct timeval tv;
struct iperf_stream *np, *sp;
int j, result=0, message, cnt=0;
char *results_string = NULL;
struct iperf_stream *np;
int j = 0, result = 0, message = 0;
int nfd = 0;
printf("in iperf_run_server \n");
@ -161,6 +163,8 @@ iperf_run_server(struct iperf_test * test)
{
test->protocol = Ptcp;
test->accept = iperf_tcp_accept;
if (test->accept < 0)
return;
test->new_stream = iperf_new_tcp_stream;
test->accept(test);
test->default_settings->state = TEST_RUNNING;
@ -172,91 +176,118 @@ iperf_run_server(struct iperf_test * test)
{
test->protocol = Pudp;
test->accept = iperf_udp_accept;
if (test->accept < 0)
return;
test->new_stream = iperf_new_udp_stream;
test->accept(test);
test->default_settings->state = TEST_RUNNING;
FD_CLR(test->listener_sock_udp, &test->temp_set);
}
/* Process the sockets for read operation */
/* XXX: Need to try to read equal amounts from each socket, so keep
track of last socket read from, and always start with the next
socket
*/
for (j = 0; j < test->max_fd + 1; j++)
/*
* XXX: Need to try to read equal amounts from each socket, so
* keep track of last socket read from, and always start with the
* next socket
*/
nfd = test->max_fd + 1;
for (j = 0; j <= test->max_fd; j++)
{
//printf("Checking socket %d \n", j);
if (FD_ISSET(j, &test->temp_set))
{
//printf("iperf_run_server: data ready on socket %d \n", j);
/* find the correct stream - possibly time consuming */
np = find_stream_by_socket(test, j);
message = np->rcv(np); /* get data from client using
* receiver callback */
//printf("iperf_run_server: iperf_tcp_recv returned %d \n", message);
if (message < 0)
{
printf("Error reading data from client \n");
close(np->socket);
return;
}
np->settings->state = message;
if (message == PARAM_EXCHANGE)
{
/* copy the received settings into test */
memcpy(test->default_settings, test->streams->settings, sizeof(struct iperf_settings));
}
if (message == ACCESS_DENIED) /* this might get set by
* PARAM_EXCHANGE */
{
/* XXX: test this! */
close(np->socket);
FD_CLR(np->socket, &test->read_set);
iperf_free_stream(np);
}
if (message == STREAM_END)
{
gettimeofday(&np->result->end_time, NULL);
}
if (message == RESULT_REQUEST)
{
np->settings->state = RESULT_RESPOND;
results_string = test->reporter_callback(test);
np->data = results_string;
send_result_to_client(np);
}
if (message == ALL_STREAMS_END)
{
printf("client send ALL_STREAMS_END message, printing results \n");
/* print server results */
results_string = test->reporter_callback(test);
puts(results_string); /* send to stdio */
}
if (message < 0)
goto done;
handle_message(test, message, np);
if (message == TEST_END)
{
/* FREE ALL STREAMS */
np = test->streams;
do
{
sp = np;
//printf(" closing socket: %d \n", sp->socket);
close(sp->socket);
FD_CLR(sp->socket, &test->read_set);
np = sp->next; /* get next pointer before
* freeing */
iperf_free_stream(sp);
} while (np != NULL);
test->default_settings->state = TEST_END;
/* reset cookie when client is finished */
printf("got TEST_END message, reseting cookie \n");
memset(test->default_settings->cookie, '\0', COOKIE_SIZE);
break; /* always break out of for loop on TEST_END message */
}
break; /* test done, so break out of loop */
} /* end if (FD_ISSET(j, &temp_set)) */
} /* end for (j=0;...) */
} /* end else (result>0) */
} /* end while */
printf("Test Complete. \n\n");
return;
done:
printf("Test Complete. \n\n");
/* reset cookie when client is finished */
memset(test->streams->settings->cookie, '\0', COOKIE_SIZE);
return;
}
/********************************************************/
void
handle_message(struct iperf_test * test, int message, struct iperf_stream * sp)
{
char *results_string = NULL;
struct iperf_stream *tp1 = NULL;
struct iperf_stream *tp2 = NULL;
//printf("handle_message: %d \n", message);
if (message < 0)
{
printf("Error reading data from client \n");
close(sp->socket);
return;
}
sp->settings->state = message;
if (message == PARAM_EXCHANGE)
{
/* copy the received settings into test */
memcpy(test->default_settings, test->streams->settings, sizeof(struct iperf_settings));
}
if (message == ACCESS_DENIED) /* this might get set by
* PARAM_EXCHANGE */
{
/* XXX: test this! */
close(sp->socket);
FD_CLR(sp->socket, &test->read_set);
iperf_free_stream(sp);
}
if (message == STREAM_END)
{
/* get final timestamp for all streams */
tp1 = test->streams;
do
{
gettimeofday(&tp1->result->end_time, NULL);
tp1 = tp1->next;
} while (tp1 != NULL);
}
if (message == RESULT_REQUEST)
{
sp->settings->state = RESULT_RESPOND;
results_string = test->reporter_callback(test);
sp->data = results_string;
send_result_to_client(sp);
}
if (message == ALL_STREAMS_END)
{
printf("client sent ALL_STREAMS_END message, printing results \n");
/* print server results */
results_string = test->reporter_callback(test);
puts(results_string); /* send to stdio */
}
if (message == TEST_END)
{
printf("client sent TEST_END message, shuting down sockets.. \n");
/* FREE ALL STREAMS */
tp1 = test->streams;
do
{
tp2 = tp1;
printf(" closing socket: %d \n", tp2->socket);
close(tp2->socket);
FD_CLR(tp2->socket, &test->read_set);
tp1 = tp2->next; /* get next pointer before freeing */
iperf_free_stream(tp2);
} while (tp1 != NULL);
test->default_settings->state = TEST_END;
memset(test->default_settings->cookie, '\0', COOKIE_SIZE);
}
}

View File

@ -1,5 +1,5 @@
int param_received(struct iperf_stream * sp, struct param_exchange * param);
int param_received(struct iperf_stream *sp, struct param_exchange * param);
void send_result_to_client(struct iperf_stream * sp);
void iperf_run_server(struct iperf_test * test);

View File

@ -30,6 +30,7 @@
#include "iperf.h"
#include "iperf_api.h"
#include "iperf_server_api.h"
#include "iperf_tcp.h"
#include "timer.h"
#include "net.h"

View File

@ -275,7 +275,8 @@ iperf_udp_accept(struct iperf_test * test)
if (connect(test->listener_sock_udp, (struct sockaddr *) & sa_peer, len) < 0)
{
perror("connect");
perror("iperf_udp_accept: connect error");
exit(-1); /* XXX: for debugging */
return -1;
}
sp = test->new_stream(test);

View File

@ -84,8 +84,7 @@ main(int argc, char **argv)
char ch, role;
struct iperf_test *test;
int port = PORT, cnt = 0;
struct iperf_stream *np;
int port = PORT;
#ifdef TEST_PROC_AFFINITY
/* didnt seem to work.... */

View File

@ -43,7 +43,7 @@ netdial(int proto, char *client, int port)
if (connect(s, (struct sockaddr *) & sa, sizeof sa) < 0 && errno != EINPROGRESS)
{
perror("connect error");
perror("netdial: connect error");
return (-1);
}
sn = sizeof sa;