changed code for Select API

This commit is contained in:
kaustubhprabhu 2009-06-02 01:13:26 +00:00
parent ff5411c68d
commit 4bd06c53f0

View File

@ -2,7 +2,7 @@
* iperfjd -- greatly simplified version of iperf with the same interface * iperfjd -- greatly simplified version of iperf with the same interface
* semantics * semantics
* *
* jdugan -- 24 Jan 2009 * kprabhu - 29th May 2009
* *
*/ */
@ -43,6 +43,7 @@ enum {
uS_TO_NS = 1000, uS_TO_NS = 1000,
MAX_BUFFER_SIZE =10,
DEFAULT_UDP_BUFSIZE = 1470, DEFAULT_UDP_BUFSIZE = 1470,
DEFAULT_TCP_BUFSIZE = 8192 DEFAULT_TCP_BUFSIZE = 8192
}; };
@ -71,6 +72,7 @@ struct iperf_stream
struct iperf_stream *next; struct iperf_stream *next;
}; };
// Run routines for TCP and UDP- will be called by pthread_create() indirectly
void *udp_client_thread(struct iperf_stream *sp); void *udp_client_thread(struct iperf_stream *sp);
void *udp_server_thread(struct iperf_stream *sp); void *udp_server_thread(struct iperf_stream *sp);
void *tcp_client_thread(struct iperf_stream *sp); void *tcp_client_thread(struct iperf_stream *sp);
@ -124,6 +126,30 @@ default_settings(struct iperf_settings *settings)
struct iperf_stream *streams; /* head of list of streams */ struct iperf_stream *streams; /* head of list of streams */
int done = 0; int done = 0;
/*--------------------------------------------------------
* Displays the current list of streams
-------------------------------------------------------*/
void Display()
{
struct iperf_stream * n;
n= streams;
int count=1;
while(n->next!=NULL)
{
printf("position-%d\tsp=%d\tsocket=%d\n",count++,(int)n,n->sock);
n=n->next;
}
}
/*--------------------------------------------------------
* sets the parameters for the new stream created
-------------------------------------------------------*/
struct iperf_stream * struct iperf_stream *
new_stream(int s, struct iperf_settings *settings) new_stream(int s, struct iperf_settings *settings)
{ {
@ -136,35 +162,41 @@ new_stream(int s, struct iperf_settings *settings)
return(NULL); return(NULL);
} }
//initialise sp with 0
memset(sp, 0, sizeof(struct iperf_stream)); memset(sp, 0, sizeof(struct iperf_stream));
// copy settings and passed socket into stream
sp->settings = settings; sp->settings = settings;
sp->sock = s; sp->sock = s;
len = sizeof sp->local; len = sizeof sp->local;
if(getsockname(sp->sock, (struct sockaddr *) &sp->local, &len) < 0) { if(getsockname(sp->sock, (struct sockaddr *) &sp->local, &len) < 0) {
perror("getsockname"); perror("getsockname");
free(sp); free(sp);
return(NULL); return(NULL);
} }
//converts the local ip into string address
if(inet_ntop(AF_INET, (void *) &sp->local.sin_addr, if(inet_ntop(AF_INET, (void *) &sp->local.sin_addr,
(void *) &sp->local_addr, 512) == NULL) { (void *) &sp->local_addr, 512) == NULL) {
perror("inet_pton"); perror("inet_pton");
} }
//stores the socket id.
if(getpeername(sp->sock, (struct sockaddr *) &sp->peer, &len) < 0) { if(getpeername(sp->sock, (struct sockaddr *) &sp->peer, &len) < 0) {
perror("getpeername"); perror("getpeername");
free(sp); free(sp);
return(NULL); return(NULL);
} }
// converts the remote ip into string address
if(inet_ntop(AF_INET, (void *) &sp->peer.sin_addr, if(inet_ntop(AF_INET, (void *) &sp->peer.sin_addr,
(void *) &sp->peer_addr, 512) == NULL) { (void *) &sp->peer_addr, 512) == NULL) {
perror("inet_pton"); perror("inet_pton");
} }
// sets appropriate function pointer
switch (settings->proto) { switch (settings->proto) {
case Ptcp: case Ptcp:
sp->client = (void *) tcp_client_thread; sp->client = (void *) tcp_client_thread;
@ -199,6 +231,11 @@ new_stream(int s, struct iperf_settings *settings)
return(sp); return(sp);
} }
/*--------------------------------------------------------
* add a stream into stream_list linked list
-------------------------------------------------------*/
void void
add_stream(struct iperf_stream *sp) add_stream(struct iperf_stream *sp)
{ {
@ -214,12 +251,75 @@ add_stream(struct iperf_stream *sp)
} }
} }
void
/*--------------------------------------------------------
* delete the stream
-------------------------------------------------------*/
int
free_stream(struct iperf_stream *sp) free_stream(struct iperf_stream *sp)
{ {
free(sp); printf("I am in free_stream \n");
struct iperf_stream *prev,*start;
prev = streams;
start = streams;
if(streams->sock==sp->sock)
{
printf("in 1 -Deleted\n\n\n");
streams=streams->next;
return 0;
}
else
{
do
{
start= streams->next;
if(start->sock==sp->sock)
{
printf("Deleting socket %d \t",sp->sock);
prev->next = sp->next;
printf("Deleted\n\n\n");
free(sp);
return 0;
}
prev=prev->next;
}while(start->next!=NULL);
return -1;
}
} }
/*--------------------------------------------------------
* update the stream
-------------------------------------------------------*/
struct iperf_stream *
find_update_stream(int j, int result)
{
struct iperf_stream *n;
n=streams;
//find the correct stream for update
do
{
if(n->sock==j)
{
n->bytes_in+= result; //update the byte count
break;
}
n = n->next;
}while(n->next!=NULL);
return n;
}
/*--------------------------------------------------------
* Display connected message
-------------------------------------------------------*/
void connect_msg(struct iperf_stream *sp) void connect_msg(struct iperf_stream *sp)
{ {
char *ipl, *ipr; char *ipl, *ipr;
@ -233,6 +333,10 @@ void connect_msg(struct iperf_stream *sp)
sp->peer_addr, htons(sp->peer.sin_port)); sp->peer_addr, htons(sp->peer.sin_port));
} }
/*--------------------------------------------------------
* UDP client functionality.
-------------------------------------------------------*/
void * void *
udp_client_thread(struct iperf_stream *sp) udp_client_thread(struct iperf_stream *sp)
{ {
@ -296,6 +400,10 @@ udp_client_thread(struct iperf_stream *sp)
pthread_exit(NULL); pthread_exit(NULL);
} }
/*--------------------------------------------------------
* UDP Server functionality.
-------------------------------------------------------*/
void * void *
udp_server_thread(struct iperf_stream *sp) udp_server_thread(struct iperf_stream *sp)
{ {
@ -318,16 +426,30 @@ udp_server_thread(struct iperf_stream *sp)
pthread_exit(NULL); pthread_exit(NULL);
} }
/*--------------------------------------------------------
* UDP Reporting routine
-------------------------------------------------------*/
void void
udp_report(int final) udp_report(int final)
{ {
} }
/*--------------------------------------------------------
* UDP Reporting routine
-------------------------------------------------------*/
void void
tcp_report(int final) tcp_report(int final)
{ {
} }
/*--------------------------------------------------------
* TCP client functionality
* -------------------------------------------------------*/
void * void *
tcp_client_thread(struct iperf_stream *sp) tcp_client_thread(struct iperf_stream *sp)
{ {
@ -361,6 +483,9 @@ tcp_client_thread(struct iperf_stream *sp)
pthread_exit(NULL); pthread_exit(NULL);
} }
/*--------------------------------------------------------
* TCP Server functionality
* -------------------------------------------------------*/
void * void *
tcp_server_thread(struct iperf_stream *sp) tcp_server_thread(struct iperf_stream *sp)
{ {
@ -385,6 +510,10 @@ tcp_server_thread(struct iperf_stream *sp)
pthread_exit(NULL); pthread_exit(NULL);
} }
/*--------------------------------------------------------
* This is code for Client
* -------------------------------------------------------*/
int int
client(struct iperf_settings *settings) client(struct iperf_settings *settings)
{ {
@ -407,14 +536,18 @@ client(struct iperf_settings *settings)
sp = new_stream(s, settings); sp = new_stream(s, settings);
add_stream(sp); add_stream(sp);
connect_msg(sp); connect_msg(sp);
// need to replace this with Select
pthread_create(&sp->thread, NULL, sp->client, (void *) sp); pthread_create(&sp->thread, NULL, sp->client, (void *) sp);
} }
timer = new_timer(settings->duration, 0); timer = new_timer(settings->duration, 0);
// wait till the timer expires
while(!timer->expired(timer)) while(!timer->expired(timer))
sleep(settings->duration); sleep(settings->duration);
// this is checked in UDP/TCP while loop
// to stop sending packets. Global member
done = 1; done = 1;
/* XXX: report */ /* XXX: report */
@ -427,14 +560,25 @@ client(struct iperf_settings *settings)
return 0; return 0;
} }
/*--------------------------------------------------------
* This is code for Server
* -------------------------------------------------------*/
int int
server(struct iperf_settings *settings) server(struct iperf_settings *settings)
{ {
int s, cs, sz; int s, cs, sz;
struct iperf_stream *sp; struct iperf_stream *sp,*temp;
struct sockaddr_in sa_peer; struct sockaddr_in sa_peer;
socklen_t len; socklen_t len;
char buf[settings->bufsize], ubuf[UNIT_LEN]; char buf[settings->bufsize], ubuf[UNIT_LEN];
fd_set readset, tempset;
int maxfd, flags;
int peersock, j, result, result1, sent;
struct timeval tv;
char buffer[DEFAULT_TCP_BUFSIZE];
struct sockaddr_in addr;
s = netannounce(settings->proto, NULL, settings->port); s = netannounce(settings->proto, NULL, settings->port);
if(s < 0) if(s < 0)
@ -444,48 +588,162 @@ server(struct iperf_settings *settings)
perror("unable to set window"); perror("unable to set window");
return -1; return -1;
} }
printf("-----------------------------------------------------------\n"); printf("-----------------------------------------------------------\n");
printf("Server listening on %d\n", settings->port); printf("Server listening on %d\n", settings->port);
int x; int x;
if((x = getsock_tcp_windowsize(s, SO_RCVBUF)) < 0) if((x = getsock_tcp_windowsize(s, SO_RCVBUF)) < 0)
perror("SO_RCVBUF"); perror("SO_RCVBUF");
/*
unit_snprintf(ubuf, UNIT_LEN, (double) x, 'A'); unit_snprintf(ubuf, UNIT_LEN, (double) x, 'A');
printf("%s: %s\n", printf("%s: %s\n",
settings->proto == Ptcp ? "TCP window size" : "UDP buffer size", ubuf); settings->proto == Ptcp ? "TCP window size" : "UDP buffer size", ubuf);
*/
printf("-----------------------------------------------------------\n"); printf("-----------------------------------------------------------\n");
len = sizeof sa_peer; len = sizeof sa_peer;
while(1) {
if (Ptcp == settings->proto) { FD_ZERO(&readset);
if( (cs = accept(s, (struct sockaddr *) &sa_peer, &len)) < 0) { FD_SET(s, &readset);
perror("accept"); maxfd = s;
return -1;
} do {
sp = new_stream(cs, settings);
} else if (Pudp == settings->proto) { memcpy(&tempset, &readset, sizeof(tempset));
sz = recvfrom(s, buf, settings->bufsize, 0, (struct sockaddr *) &sa_peer, &len); tv.tv_sec = 15; // timeout interval in seconds
if(!sz) tv.tv_usec = 0;
break;
// using select to check on multiple descriptors.
result = select(maxfd + 1, &tempset, NULL, NULL, &tv);
if (result == 0)
{
printf("select() timed out!\n");
}
else if (result < 0 && errno != EINTR)
{
printf("Error in select(): %s\n", strerror(errno));
}
else if (result > 0)
{
if (FD_ISSET(s, &tempset))
{
if(settings->proto== Ptcp) // New TCP Connection
{
len = sizeof(addr);
peersock = accept(s,(struct sockaddr *) &addr, &len);
if (peersock < 0)
{
printf("Error in accept(): %s\n", strerror(errno));
}
else
{
printf("=========================\n");
printf("%d is the socket number\n",peersock);
FD_SET(peersock, &readset);
maxfd = (maxfd < peersock)?peersock:maxfd;
// creating a new stream
sp = new_stream(peersock, settings);
printf(" new stream is %d \n", (int)sp);
// need to manage the linked list of streams
add_stream(sp);
connect_msg(sp);
Display();
}
}
else if ( settings->proto== Pudp) //New UDP Connection
{
// getting a new UDP packet
sz = recvfrom(s, buf, settings->bufsize, 0, (struct sockaddr *) &sa_peer, &len);
if(!sz)
break;
if(connect(s, (struct sockaddr *) &sa_peer, len) < 0)
{
perror("connect");
return -1;
}
// get a new socket to connect to client
printf("=========================\n");
sp = new_stream(s, settings);
sp->bytes_in += sz;
printf(" new stream is %d \n", (int)sp);
// need to manage the linked list of streams
add_stream(sp);
if(connect(s, (struct sockaddr *) &sa_peer, len) < 0) {
perror("connect"); s = netannounce(settings->proto, NULL, settings->port);
return -1; if(s < 0)
} return -1;
sp = new_stream(s, settings); // same as TCP -repetation
sp->bytes_in += sz; printf("%d is the socket number\n",s);
FD_SET(s, &readset);
s = netannounce(settings->proto, NULL, settings->port); maxfd = (maxfd < s)?s:maxfd;
if(s < 0) {
return -1; }
}
} FD_CLR(s, &tempset);
connect_msg(sp); }
pthread_create(&sp->thread, NULL, sp->server, (void *) sp);
}
// scanning all socket descriptors for read
for (j=0; j<maxfd+1; j++)
{
if (FD_ISSET(j, &tempset))
{
do
{
if(settings->proto==Ptcp)
result = recv(j, buffer,DEFAULT_TCP_BUFSIZE, 0);
else
result = recv(j, buffer,DEFAULT_UDP_BUFSIZE, 0);
} while (result == -1 && errno == EINTR);
if (result > 0)
{
sp=find_update_stream(j,result);
}
else if (result == 0)
{
Display();
//just find the stream with zero update
sp = find_update_stream(j,0);
if(settings->proto == Ptcp)
{
//printf("window: %d\n", getsock_tcp_windowsize(sp->sock, SO_RCVBUF));
}
unit_snprintf(ubuf, UNIT_LEN, (double) sp->bytes_in / sp->settings->duration, 'a');
printf("%llu bytes received %s/sec for stream %d\n\n", sp->bytes_in, ubuf,(int)sp);
close(j);
FD_CLR(j, &readset);
free_stream(sp); // this needs to be a linked list delete
//break;
}
else
{
printf("Error in recv(): %s\n", strerror(errno));
}
} // end if (FD_ISSET(j, &tempset))
} // end for (j=0;...)
} // end else if (result > 0)
} while (1);
return 0; return 0;
} }