diff --git a/c/ServerNBTHR/ClientSideProc.c b/c/ServerNBTHR/ClientSideProc.c index 8ae9448..3009aa5 100644 --- a/c/ServerNBTHR/ClientSideProc.c +++ b/c/ServerNBTHR/ClientSideProc.c @@ -15,8 +15,8 @@ int main(int argc, char **argv) { CLIENT *cl; - struct square_in in; - struct square_out out; + square_in in; + square_out out; if (argc != 3) { printf("Usage : client \n"); exit(1); diff --git a/c/ServerNBTHR/ServerNBTHR.c b/c/ServerNBTHR/ServerNBTHR.c index de6d02f..e3275fb 100644 --- a/c/ServerNBTHR/ServerNBTHR.c +++ b/c/ServerNBTHR/ServerNBTHR.c @@ -16,72 +16,65 @@ pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER; -void -die(const char *func, int err) +void die(const char *func, int err) { - fprintf(stderr,"%s: %s\n",func, strerror(err)); - abort(); + fprintf(stderr, "%s: %s\n", func, strerror(err)); + abort(); } -void -bark(const char *func, int err) +void bark(const char *func, int err) { - fprintf(stderr,"%s: %s\n",func, strerror(err)); + fprintf(stderr, "%s: %s\n", func, strerror(err)); } + /* Описание поцедуры ведущего потока , которая возвращает дескрипторов пассивного сокета, привязанного к адресу сервера. */ -int -getServerSocket(unsigned short int port) +int getServerSocket(unsigned short int port) { - int listenSocket; - struct sockaddr_in listenSockaddr; + int listenSocket; + struct sockaddr_in listenSockaddr; - if((listenSocket=socket(PF_INET,SOCK_STREAM,0))<0) - die("socket()",errno); - memset(&listenSockaddr, 0, sizeof(listenSockaddr)); - listenSockaddr.sin_family = PF_INET; - listenSockaddr.sin_port = htons(port); - listenSockaddr.sin_addr.s_addr = INADDR_ANY; + if ((listenSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0) + die("socket()", errno); + memset(&listenSockaddr, 0, sizeof(listenSockaddr)); + listenSockaddr.sin_family = PF_INET; + listenSockaddr.sin_port = htons(port); + listenSockaddr.sin_addr.s_addr = INADDR_ANY; - if(bind(listenSocket,(struct sockaddr*)&listenSockaddr, - sizeof(listenSockaddr)) < 0) - die("bind()",errno); + if (bind(listenSocket, (struct sockaddr *)&listenSockaddr, sizeof(listenSockaddr)) < 0) + die("bind()", errno); - if(listen(listenSocket,5)<0) - die("listen()",errno); + if (listen(listenSocket, 5) < 0) + die("listen()", errno); - return listenSocket; + return listenSocket; } /* Описание процедуры выполняемой всеми ведомыми потоками */ - -void * -serv_request(void *data) +void *serv_request(void *data) { -struct connection_cb -{ - int dataSocket; - char data[256]; - int dataSent; - int dataToSend; - int isReading; - struct connection_cb *next; -}; + struct connection_cb { + int dataSocket; + char data[256]; + int dataSent; + int dataToSend; + int isReading; + struct connection_cb *next; + }; -struct connection_cb *connections = NULL; + struct connection_cb *connections = NULL; -int listenSocket = (int)data; - if(fcntl(listenSocket,F_SETFL,O_NONBLOCK)<0) - die("fcntl()",errno); + int listenSocket = (int)data; + if (fcntl(listenSocket, F_SETFL, O_NONBLOCK) < 0) + die("fcntl()", errno); - while(1) - { + while (1) { fd_set readFdSet; fd_set writeFdSet; struct connection_cb *currentConn, **currentConnPtr, *tempConn; @@ -90,204 +83,179 @@ int listenSocket = (int)data; FD_ZERO(&readFdSet); FD_ZERO(&writeFdSet); - /* - Добавление дескриптора к множеству readFdSet - */ - FD_SET(listenSocket,&readFdSet); + /* + Добавление дескриптора к множеству readFdSet + */ + FD_SET(listenSocket, &readFdSet); maxFdNum = listenSocket; - for(currentConn = connections;currentConn!=NULL;currentConn = - currentConn->next) - { - if(currentConn->isReading) - FD_SET(currentConn->dataSocket,&readFdSet); - else - FD_SET(currentConn->dataSocket,&writeFdSet); - maxFdNum = currentConn->dataSocket > maxFdNum ?currentConn- - >dataSocket : maxFdNum; - } - /* - Получение множества дескрипторов сокетов для обработки - */ - if(select(maxFdNum+1,&readFdSet,&writeFdSet,NULL,NULL) < 0) - { - if(errno == EINTR) - continue; - die("select()",errno); - } - - currentConnPtr=&connections; - - while(*currentConnPtr!=NULL) - { - - /* - Проверка принадлежности дескриптора - (*currentConnPtr)->dataSocket к множеству readFdSet - */ - - if((*currentConnPtr)->isReading && - FD_ISSET((*currentConnPtr)->dataSocket,&readFdSet)) - { - int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data, - sizeof((*currentConnPtr)->data),0); - - if(result < 0) - { - if(errno!=EINTR && errno!=EAGAIN && errno!=EWOULDBLOCK) - { - bark("recv()",errno); - close((*currentConnPtr)->dataSocket); - tempConn = *currentConnPtr; - *currentConnPtr = (*currentConnPtr)->next; - free(tempConn); - continue; - } + for (currentConn = connections; currentConn != NULL; currentConn = currentConn->next) { + if (currentConn->isReading) + FD_SET(currentConn->dataSocket, &readFdSet); + else + FD_SET(currentConn->dataSocket, &writeFdSet); + maxFdNum = currentConn->dataSocket > maxFdNum ? currentConn - >dataSocket : maxFdNum; } - else - if(result==0) - { - close((*currentConnPtr)->dataSocket); - tempConn = *currentConnPtr; - *currentConnPtr = (*currentConnPtr)->next; - free(tempConn); - continue; + /* + Получение множества дескрипторов сокетов для обработки + */ + if (select(maxFdNum + 1, &readFdSet, &writeFdSet, NULL, NULL) < 0) { + if (errno == EINTR) + continue; + die("select()", errno); } - else - { - (*currentConnPtr)->dataToSend = result; - (*currentConnPtr)->dataSent = 0; - (*currentConnPtr)->isReading = 0; - printf("Recieving as Slave Thread id = '%d' \n",pthread_self()); - } - } - else - /* - Проверка принадлежности дескриптора - (*currentConnPtr)->dataSocket к множеству writedFdSet - */ - if(FD_ISSET((*currentConnPtr)->dataSocket,&writeFdSet)) - { - int result = send((*currentConnPtr)->dataSocket, - (*currentConnPtr)->data+(*currentConnPtr)->dataSent, - (*currentConnPtr) ->dataToSend-(*currentConnPtr)->dataSent, 0); - if(result < 0) - { - if(errno!=EINTR && errno!=EAGAIN) - { - bark("write()",errno); - close((*currentConnPtr)->dataSocket); - tempConn = *currentConnPtr; - *currentConnPtr = (*currentConnPtr)->next; - free(tempConn); - continue; - } - } - else - { - (*currentConnPtr)->dataSent +=result; + currentConnPtr = &connections; - if((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend) - (*currentConnPtr)->isReading = 1; + while (*currentConnPtr != NULL) { + + /* + Проверка принадлежности дескриптора + (*currentConnPtr)->dataSocket к множеству readFdSet + */ + + if ((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket, &readFdSet)) { + int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data, + sizeof((*currentConnPtr)->data), 0); + + if (result < 0) { + if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { + bark("recv()", errno); + close((*currentConnPtr)->dataSocket); + tempConn = *currentConnPtr; + *currentConnPtr = (*currentConnPtr)->next; + free(tempConn); + continue; + } + } else if (result == 0) { + close((*currentConnPtr)->dataSocket); + tempConn = *currentConnPtr; + *currentConnPtr = (*currentConnPtr)->next; + free(tempConn); + continue; + } else { + (*currentConnPtr)->dataToSend = result; + (*currentConnPtr)->dataSent = 0; + (*currentConnPtr)->isReading = 0; + printf("Recieving as Slave Thread id = '%d' \n", pthread_self()); + } + } else + /* + Проверка принадлежности дескриптора + (*currentConnPtr)->dataSocket к множеству writedFdSet + */ + if (FD_ISSET((*currentConnPtr)->dataSocket, &writeFdSet)) { + int result = send((*currentConnPtr)->dataSocket, + (*currentConnPtr)->data + (*currentConnPtr)->dataSent, + (*currentConnPtr)->dataToSend - (*currentConnPtr)->dataSent, 0); + + if (result < 0) { + if (errno != EINTR && errno != EAGAIN) { + bark("write()", errno); + close((*currentConnPtr)->dataSocket); + tempConn = *currentConnPtr; + *currentConnPtr = (*currentConnPtr)->next; + free(tempConn); + continue; + } + } else { + (*currentConnPtr)->dataSent += result; + + if ((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend) + (*currentConnPtr)->isReading = 1; } } currentConnPtr = &((*currentConnPtr)->next); - printf("Sending as Slave Thread id = '%d' \n",pthread_self()); + printf("Sending as Slave Thread id = '%d' \n", pthread_self()); } - /* - Проверка принадлежности дескриптора listenSocket - к множеству readFdSet,т.е. необходимости обработать - вызов connect( ) от нового клиента. - */ - if(FD_ISSET(listenSocket,&readFdSet)) - { + /* + Проверка принадлежности дескриптора listenSocket + к множеству readFdSet,т.е. необходимости обработать + вызов connect( ) от нового клиента. + */ + if (FD_ISSET(listenSocket, &readFdSet)) { - while(1) - { + while (1) { - /* - Вызовы pthread_mutex_lock, pthread_mutex_unlock - Не нужны в среде Linux - */ - pthread_mutex_lock(&request_mutex); - int result = accept(listenSocket,(struct sockaddr*)NULL,NULL); - pthread_mutex_unlock(&request_mutex); - if(result < 0) - { - if(errno==EAGAIN || errno == EWOULDBLOCK) - break; - die("accept()",errno); - } - else - { - *currentConnPtr = malloc(sizeof(struct connection_cb)); - if(*currentConnPtr==NULL) - die("malloc()",0); + /* + Вызовы pthread_mutex_lock, pthread_mutex_unlock + Не нужны в среде Linux + */ + pthread_mutex_lock(&request_mutex); + int result = accept(listenSocket, (struct sockaddr *)NULL, NULL); + pthread_mutex_unlock(&request_mutex); + if (result < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + die("accept()", errno); + } else { + *currentConnPtr = malloc(sizeof(struct connection_cb)); + if (*currentConnPtr == NULL) + die("malloc()", 0); - if(fcntl(result,F_SETFL,O_NONBLOCK)<0) - die("fcntl()",errno); + if (fcntl(result, F_SETFL, O_NONBLOCK) < 0) + die("fcntl()", errno); - (*currentConnPtr)->dataSocket = result; - (*currentConnPtr)->isReading = 1; - (*currentConnPtr)->next = 0; - currentConnPtr = &((*currentConnPtr)->next); - printf("Accepting as Master Thread id = '%d' \n",pthread_self()); - } + (*currentConnPtr)->dataSocket = result; + (*currentConnPtr)->isReading = 1; + (*currentConnPtr)->next = 0; + currentConnPtr = &((*currentConnPtr)->next); + printf("Accepting as Master Thread id = '%d' \n", pthread_self()); + } } } } } -int -main(int argc,char *argv[]) -{ -int k; -int descSock; -char *service="1500"; -switch(argc) { -case 1: - break; -case 2: - service = argv[1]; - break; -default: - printf ("Usage: ./ServerBNTH [port]\n"); - exit(1); -} -size_t stacksize; -pthread_t p_thread[NUM_THREADS]; +int main(int argc, char *argv[]) +{ + int k; + int descSock; + char *service = "1500"; + switch (argc) { + case 1: + break; + case 2: + service = argv[1]; + break; + default: + printf("Usage: ./ServerBNTH [port]\n"); + exit(1); + } + + size_t stacksize; + pthread_t p_thread[NUM_THREADS]; /* Установка размера стека для ведомых потоков */ -pthread_attr_t attr; - pthread_attr_init(&attr); - stacksize = 500000; - pthread_attr_setstacksize (&attr, stacksize); - pthread_attr_getstacksize (&attr, &stacksize); + pthread_attr_t attr; + pthread_attr_init(&attr); + stacksize = 500000; + pthread_attr_setstacksize(&attr, stacksize); + pthread_attr_getstacksize(&attr, &stacksize); /* Получение значения дескриптора пассивного сокета */ - descSock = getServerSocket(atoi(service)); + descSock = getServerSocket(atoi(service)); /* Запуск ведомых потоков */ -for(k=0; k #include #include @@ -10,21 +10,22 @@ #include #include -int request=0; +int request = 0; -bool_t squareproc_2_svc(square_in *inp,square_out *outp,struct svc_req *rqstp) +bool_t squareproc_2_svc(square_in * inp, square_out * outp, struct svc_req *rqstp) { - printf("Thread id = '%ld' started, arg = %d\n",pthread_self(),inp->arg1); - /* - Имитация работы процедуры , выполняемой потоками сервера - */ - sleep(5); - outp->res1=inp->arg1*inp->arg1; - printf("Thread id = '%ld' is done %d \n",pthread_self(),outp->res1); - return(TRUE); + printf("Thread id = '%ld' started, arg = %d\n", pthread_self(), inp->arg1); + /* + Имитация работы процедуры , выполняемой потоками сервера + */ + sleep(5); + outp->res1 = inp->arg1 * inp->arg1; + printf("Thread id = '%ld' is done %d \n", pthread_self(), outp->res1); + return (TRUE); } -int square_prog_2_freeresult(SVCXPRT *transp,xdrproc_t xdr_result, caddr_t result) + +int square_prog_2_freeresult(SVCXPRT * transp, xdrproc_t xdr_result, caddr_t result) { - xdr_free(xdr_result,result); - return(1); + xdr_free(xdr_result, result); + return (1); } diff --git a/c/ServerNBTHR/square.h b/c/ServerNBTHR/square.h deleted file mode 100644 index 794e35d..0000000 --- a/c/ServerNBTHR/square.h +++ /dev/null @@ -1,11 +0,0 @@ -struct square_in { - long arg1; -}; -struct square_out { - long res1; -}; -program SQUARE_PROG { - version SQUARE_VERS { - square_out SQUAREPROC(square_in) = 1; - } = 2 ; -} = 0x31230000; diff --git a/c/ServerNBTHR/square_client.c b/c/ServerNBTHR/square_client.c deleted file mode 100644 index dcf8ef9..0000000 --- a/c/ServerNBTHR/square_client.c +++ /dev/null @@ -1,48 +0,0 @@ -/* - * This is sample code generated by rpcgen. - * These are only templates and you can use them - * as a guideline for developing your own functions. - */ - -#include "square.h" - - -void -square_prog_2(char *host) -{ - CLIENT *clnt; - enum clnt_stat retval_1; - square_out result_1; - square_in squareproc_2_arg; - -#ifndef DEBUG - clnt = clnt_create (host, SQUARE_PROG, SQUARE_VERS, "udp"); - if (clnt == NULL) { - clnt_pcreateerror (host); - exit (1); - } -#endif /* DEBUG */ - - retval_1 = squareproc_2(&squareproc_2_arg, &result_1, clnt); - if (retval_1 != RPC_SUCCESS) { - clnt_perror (clnt, "call failed"); - } -#ifndef DEBUG - clnt_destroy (clnt); -#endif /* DEBUG */ -} - - -int -main (int argc, char *argv[]) -{ - char *host; - - if (argc < 2) { - printf ("usage: %s server_host\n", argv[0]); - exit (1); - } - host = argv[1]; - square_prog_2 (host); -exit (0); -} diff --git a/c/ServerNBTHR/square_server.c b/c/ServerNBTHR/square_server.c deleted file mode 100644 index a618f20..0000000 --- a/c/ServerNBTHR/square_server.c +++ /dev/null @@ -1,31 +0,0 @@ -/* - * This is sample code generated by rpcgen. - * These are only templates and you can use them - * as a guideline for developing your own functions. - */ - -#include "square.h" - -bool_t -squareproc_2_svc(square_in *argp, square_out *result, struct svc_req *rqstp) -{ - bool_t retval; - - /* - * insert server code here - */ - - return retval; -} - -int -square_prog_2_freeresult (SVCXPRT *transp, xdrproc_t xdr_result, caddr_t result) -{ - xdr_free (xdr_result, result); - - /* - * Insert additional freeing code here, if needed - */ - - return 1; -} diff --git a/c/ServerNBTHR/square_svc.c b/c/ServerNBTHR/square_svc.c index a6e1b17..e10fe8b 100644 --- a/c/ServerNBTHR/square_svc.c +++ b/c/ServerNBTHR/square_svc.c @@ -25,26 +25,24 @@ pthread_attr_t attr; */ -void * -serv_request(void *data) +void *serv_request(void *data) { -struct thr_data -{ -struct svc_req *rqstp; -SVCXPRT *transp; -} *ptr_data; + struct thr_data { + struct svc_req *rqstp; + SVCXPRT *transp; + } *ptr_data; -{ + { - union { - square_in squareproc_2_arg; - } argument; - union { - square_out squareproc_2_res; - } result; - bool_t retval; - xdrproc_t _xdr_argument, _xdr_result; - bool_t (*local)(char *, void *, struct svc_req *); + union { + square_in squareproc_2_arg; + } argument; + union { + square_out squareproc_2_res; + } result; + bool_t retval; + xdrproc_t _xdr_argument, _xdr_result; + bool_t(*local) (char *, void *, struct svc_req *); /* @@ -52,53 +50,51 @@ SVCXPRT *transp; */ -ptr_data = (struct thr_data *)data; -struct svc_req *rqstp = ptr_data->rqstp; -register SVCXPRT *transp = ptr_data->transp; + ptr_data = (struct thr_data *)data; + struct svc_req *rqstp = ptr_data->rqstp; + register SVCXPRT *transp = ptr_data->transp; + switch (rqstp->rq_proc) { + case NULLPROC: + (void)svc_sendreply(transp, (xdrproc_t) xdr_void, (char *)NULL); + return; - switch (rqstp->rq_proc) { - case NULLPROC: - (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL); - return; + case SQUAREPROC: + _xdr_argument = (xdrproc_t) xdr_square_in; + _xdr_result = (xdrproc_t) xdr_square_out; + local = (bool_t(*)(char *, void *, struct svc_req *))squareproc_2_svc; + break; - case SQUAREPROC: - _xdr_argument = (xdrproc_t) xdr_square_in; - _xdr_result = (xdrproc_t) xdr_square_out; - local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc; - break; + default: + svcerr_noproc(transp); + return; + } + memset((char *)&argument, 0, sizeof(argument)); + if (!svc_getargs(transp, (xdrproc_t) _xdr_argument, (caddr_t) & argument)) { + svcerr_decode(transp); + return; + } - default: - svcerr_noproc (transp); - return; - } - memset ((char *)&argument, 0, sizeof (argument)); - if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) { - svcerr_decode (transp); - return; - } + /* - /* + Стандартный вызов функции сервера. + Данные для вызова уже приведены к стандарту. - Стандартный вызов функции сервера. - Данные для вызова уже приведены к стандарту. + */ - */ + retval = (bool_t) (*local) ((char *)&argument, (void *)&result, rqstp); - retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp); - - if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result)) - { - svcerr_systemerr (transp); - } - if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) { - fprintf (stderr, "%s", "unable to free arguments"); - exit (1); - } - if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result)) - fprintf (stderr, "%s", "unable to free results"); - return; -} + if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result)) { + svcerr_systemerr(transp); + } + if (!svc_freeargs(transp, (xdrproc_t) _xdr_argument, (caddr_t) & argument)) { + fprintf(stderr, "%s", "unable to free arguments"); + exit(1); + } + if (!square_prog_2_freeresult(transp, _xdr_result, (caddr_t) & result)) + fprintf(stderr, "%s", "unable to free results"); + return; + } } /* @@ -109,58 +105,39 @@ register SVCXPRT *transp = ptr_data->transp; */ -static void -square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp) -{ -struct data_str -{ -struct svc_req *rqstp; -SVCXPRT *transp; -} *data_ptr =(struct data_str*)malloc(sizeof(struct data_str); - +static void square_prog_2(struct svc_req *rqstp, register SVCXPRT * transp) { + struct data_str { + struct svc_req *rqstp; + SVCXPRT *transp; + } *data_ptr = (struct data_str *)malloc(sizeof(struct data_str); { /* Упаковка данных в структуру для передачи ссылки на нее, как параметра запускаемому потоку */ -data_ptr->rqstp = rqstp; -data_ptr->transp = transp; -pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); -pthread_create(&p_thread,&attr,serv_request,(void *)data_ptr); - } -} + data_ptr->rqstp = rqstp; + data_ptr->transp = transp; + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + pthread_create(&p_thread, &attr, serv_request, (void *)data_ptr);} + } -int -main (int argc, char **argv) -{ - register SVCXPRT *transp; + int main(int argc, char **argv) { + register SVCXPRT * transp; + pmap_unset(SQUARE_PROG, SQUARE_VERS); + transp = svcudp_create(RPC_ANYSOCK); if (transp == NULL) { + fprintf(stderr, "%s", "cannot create udp service."); exit(1);} + if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_UDP)) { + fprintf(stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, udp)."); + exit(1);} - pmap_unset (SQUARE_PROG, SQUARE_VERS); + transp = svctcp_create(RPC_ANYSOCK, 0, 0); if (transp == NULL) { + fprintf(stderr, "%s", "cannot create tcp service."); exit(1);} + if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_TCP)) { + fprintf(stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, tcp)."); + exit(1);} - transp = svcudp_create(RPC_ANYSOCK); - if (transp == NULL) { - fprintf (stderr, "%s", "cannot create udp service."); - exit(1); - } - if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_UDP)) { - fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, udp)."); - exit(1); - } - - transp = svctcp_create(RPC_ANYSOCK, 0, 0); - if (transp == NULL) { - fprintf (stderr, "%s", "cannot create tcp service."); - exit(1); - } - if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_TCP)) { - fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, tcp)."); - exit(1); - } - - svc_run (); - fprintf (stderr, "%s", "svc_run returned"); - exit (1); - /* NOTREACHED */ -} + svc_run(); fprintf(stderr, "%s", "svc_run returned"); exit(1); + /* NOTREACHED */ + } diff --git a/c/masync_server/echo-client.c b/c/masync_server/echo-client.c index 5093ce6..213a74a 100644 --- a/c/masync_server/echo-client.c +++ b/c/masync_server/echo-client.c @@ -7,25 +7,27 @@ #include #include -unsigned int ip2int(const char * s) { +unsigned int ip2int(const char *s) +{ int ip[4]; - sscanf(s,"%d.%d.%d.%d",ip,ip+1,ip+2,ip+3); - return ip[0]<<24|ip[1]<<16|ip[2]<<8|ip[3]; + sscanf(s, "%d.%d.%d.%d", ip, ip + 1, ip + 2, ip + 3); + return ip[0] << 24 | ip[1] << 16 | ip[2] << 8 | ip[3]; } int sendall(int s, char *buf, int len, int flags) { - int total = 0; - int n; + int total = 0; + int n; - while(total < len) - { - n = send(s, buf+total, len-total, flags); - if(n == -1) { break; } - total += n; - } + while (total < len) { + n = send(s, buf + total, len - total, flags); + if (n == -1) { + break; + } + total += n; + } - return (n==-1 ? -1 : total); + return (n == -1 ? -1 : total); } char message[] = "Hello there!\n"; @@ -33,30 +35,28 @@ char buf[sizeof(message)]; int main() { - int sock; - struct sockaddr_in addr; + int sock; + struct sockaddr_in addr; - sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock < 0) - { - perror("socket"); - exit(1); - } + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + perror("socket"); + exit(1); + } - addr.sin_family = AF_INET; - addr.sin_port = htons(3425); // или любой другой порт... - addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); // (ip2int("192.168.2.1")); - if(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) - { - perror("connect"); - exit(2); - } + addr.sin_family = AF_INET; + addr.sin_port = htons(3425); // или любой другой порт... + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); // (ip2int("192.168.2.1")); + if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + perror("connect"); + exit(2); + } - send(sock, message, sizeof(message), 0); - recv(sock, buf, sizeof(message), 0); + send(sock, message, sizeof(message), 0); + recv(sock, buf, sizeof(message), 0); - printf("%s\n",buf); - close(sock); + printf("%s\n", buf); + close(sock); - return 0; + return 0; } diff --git a/c/masync_server/echo-server.c b/c/masync_server/echo-server.c index d68930b..8c59082 100644 --- a/c/masync_server/echo-server.c +++ b/c/masync_server/echo-server.c @@ -7,59 +7,303 @@ #include #include -static const char * int2ip(unsigned int ip) { +#include +#include + +#include "xerror.h" + +#define NUM_THREADS 8 + +pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER; + +static const char *int2ip(unsigned int ip) +{ static char s[16]; - sprintf(s,"%d.%d.%d.%d",ip>>24,ip<<8>>24,ip<<16>>24,ip<<24>>24); + sprintf(s, "%d.%d.%d.%d", ip >> 24, ip << 8 >> 24, ip << 16 >> 24, ip << 24 >> 24); return s; } -int main() +void *serv_request(void *data) { - int sock, listener; - struct sockaddr_in addr; - char buf[1024]; - int bytes_read; + struct connection_cb { + int dataSocket; + char data[256]; + int dataSent; + int dataToSend; + int isReading; + struct connection_cb *next; + }; - listener = socket(AF_INET, SOCK_STREAM, 0); - if(listener < 0) - { - perror("socket"); - exit(1); - } + struct connection_cb *connections = NULL; - addr.sin_family = AF_INET; - addr.sin_port = htons(3425); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - if(bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0) - { - perror("bind"); - exit(2); - } + int listenSocket = (int)data; + if (fcntl(listenSocket, F_SETFL, O_NONBLOCK) < 0) + die("fcntl()", errno); - listen(listener, 1); - //~ printf("%X\n",addr.sin_addr.s_addr); - while(1) - { - socklen_t n = sizeof(struct sockaddr); - sock = accept(listener, (struct sockaddr *)&addr, &n); - printf("incoming request from %s\n",int2ip(ntohl(addr.sin_addr.s_addr))); - //~ printf("%X\n",addr.sin_addr.s_addr); - if(sock < 0) - { - perror("accept"); - exit(3); - } + while (1) { + fd_set readFdSet; + fd_set writeFdSet; + struct connection_cb *currentConn, **currentConnPtr, *tempConn; + int maxFdNum; - while(1) - { - bytes_read = recv(sock, buf, 1024, 0); - if(bytes_read <= 0) break; - send(sock, buf, bytes_read, 0); - printf("%s", buf); - } + FD_ZERO(&readFdSet); + FD_ZERO(&writeFdSet); - close(sock); - } + /* + Добавление дескриптора к множеству readFdSet + */ + FD_SET(listenSocket, &readFdSet); + maxFdNum = listenSocket; - return 0; + for (currentConn = connections; currentConn != NULL; currentConn = currentConn->next) { + if (currentConn->isReading) + FD_SET(currentConn->dataSocket, &readFdSet); + else + FD_SET(currentConn->dataSocket, &writeFdSet); + maxFdNum = currentConn->dataSocket > maxFdNum ? currentConn->dataSocket : maxFdNum; + } + /* + Получение множества дескрипторов сокетов для обработки + */ + if (select(maxFdNum + 1, &readFdSet, &writeFdSet, NULL, NULL) < 0) { + if (errno == EINTR) + continue; + die("select()", errno); + } + + currentConnPtr = &connections; + + while (*currentConnPtr != NULL) { + + /* + Проверка принадлежности дескриптора + (*currentConnPtr)->dataSocket к множеству readFdSet + */ + + if ((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket, &readFdSet)) { + int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data, + sizeof((*currentConnPtr)->data), 0); + + if (result < 0) { + if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { + bark("recv()", errno); + close((*currentConnPtr)->dataSocket); + tempConn = *currentConnPtr; + *currentConnPtr = (*currentConnPtr)->next; + free(tempConn); + continue; + } + } else if (result == 0) { + close((*currentConnPtr)->dataSocket); + tempConn = *currentConnPtr; + *currentConnPtr = (*currentConnPtr)->next; + free(tempConn); + continue; + } else { + (*currentConnPtr)->dataToSend = result; + (*currentConnPtr)->dataSent = 0; + (*currentConnPtr)->isReading = 0; + printf("Recieving as Slave Thread id = '%d' \n", pthread_self()); + } + } else + /* + Проверка принадлежности дескриптора + (*currentConnPtr)->dataSocket к множеству writedFdSet + */ + if (FD_ISSET((*currentConnPtr)->dataSocket, &writeFdSet)) { + int result = send((*currentConnPtr)->dataSocket, + (*currentConnPtr)->data + (*currentConnPtr)->dataSent, + (*currentConnPtr)->dataToSend - (*currentConnPtr)->dataSent, 0); + + if (result < 0) { + if (errno != EINTR && errno != EAGAIN) { + bark("write()", errno); + close((*currentConnPtr)->dataSocket); + tempConn = *currentConnPtr; + *currentConnPtr = (*currentConnPtr)->next; + free(tempConn); + continue; + } + } else { + (*currentConnPtr)->dataSent += result; + + if ((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend) + (*currentConnPtr)->isReading = 1; + } + } + + currentConnPtr = &((*currentConnPtr)->next); + printf("Sending as Slave Thread id = '%d' \n", pthread_self()); + } + /* + Проверка принадлежности дескриптора listenSocket + к множеству readFdSet,т.е. необходимости обработать + вызов connect( ) от нового клиента. + */ + if (FD_ISSET(listenSocket, &readFdSet)) { + + while (1) { + + /* + Вызовы pthread_mutex_lock, pthread_mutex_unlock + Не нужны в среде Linux + */ + pthread_mutex_lock(&request_mutex); + int result = accept(listenSocket, (struct sockaddr *)NULL, NULL); + pthread_mutex_unlock(&request_mutex); + if (result < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + die("accept()", errno); + } else { + *currentConnPtr = malloc(sizeof(struct connection_cb)); + if (*currentConnPtr == NULL) + die("malloc()", 0); + + if (fcntl(result, F_SETFL, O_NONBLOCK) < 0) + die("fcntl()", errno); + + (*currentConnPtr)->dataSocket = result; + (*currentConnPtr)->isReading = 1; + (*currentConnPtr)->next = 0; + currentConnPtr = &((*currentConnPtr)->next); + printf("Accepting as Master Thread id = '%d' \n", pthread_self()); + } + } + } + } } + +//~ void *serv_request(void *data) +//~ { + //~ struct connection_cb + //~ { + //~ int dataSocket; + //~ char data[256]; + //~ int dataSent; + //~ int dataToSend; + //~ int isRading; + //~ struct connection_cb *next; + //~ }; +//~ + //~ struct connection_cb *connections = NULL; +//~ + //~ int listenSocket = (int)data; +//~ + //~ if(fcntl(listenSocket, F_SETFL, O_NONBLOCK) < 0) + //~ die("fcntl()", errno); +//~ + //~ + //~ +//~ } + +int getServerSocket(unsigned short int port) +{ + int listenSocket; + struct sockaddr_in listenSockaddr; + + if ((listenSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0) + die("socket()", errno); + memset(&listenSockaddr, 0, sizeof(listenSockaddr)); + listenSockaddr.sin_family = PF_INET; + listenSockaddr.sin_port = htons(port); + listenSockaddr.sin_addr.s_addr = INADDR_ANY; + + if (bind(listenSocket, (struct sockaddr *)&listenSockaddr, sizeof(listenSockaddr)) < 0) + die("bind()", errno); + + if (listen(listenSocket, 5) < 0) + die("listen()", errno); + + return listenSocket; +} + +int main(int argc, char *argv[]) +{ + int k; + int descSock; + char *service = "1500"; + + switch (argc) { + case 1: + break; + case 2: + service = argv[1]; + break; + default: + printf("Usage: ./echo-server [port]\n"); + exit(1); + } + + pthread_t p_thread[NUM_THREADS]; + pthread_attr_t attr; + pthread_attr_init(&attr); + size_t stacksize = 512; + pthread_attr_setstacksize(&attr, stacksize); + pthread_attr_getstacksize(&attr, &stacksize); + + descSock = getServerSocket(atoi(service)); + + for (k = 0; k < NUM_THREADS; k++) { + pthread_create(&p_thread[k], &attr, serv_request, (void *)descSock); + printf("Thread %d started\n", k); + } + + pthread_attr_destroy(&attr); + + for (k = 0; k < NUM_THREADS; k++) { + pthread_join(p_thread[k], NULL); + printf("Completed join with thread %d\n", k); + } +} + +//~ int tmp() +//~ { + //~ int sock, listener; + //~ struct sockaddr_in addr; + //~ char buf[1024]; + //~ int bytes_read; +//~ + //~ listener = socket(AF_INET, SOCK_STREAM, 0); + //~ if(listener < 0) + //~ { + //~ perror("socket"); + //~ exit(1); + //~ } +//~ + //~ addr.sin_family = AF_INET; + //~ addr.sin_port = htons(3425); + //~ addr.sin_addr.s_addr = htonl(INADDR_ANY); + //~ if(bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0) + //~ { + //~ perror("bind"); + //~ exit(2); + //~ } +//~ + //~ listen(listener, 1); + //~ //printf("%X\n",addr.sin_addr.s_addr); + //~ while(1) + //~ { + //~ socklen_t n = sizeof(struct sockaddr); + //~ sock = accept(listener, (struct sockaddr *)&addr, &n); + //~ printf("incoming request from %s\n",int2ip(ntohl(addr.sin_addr.s_addr))); + //~ //printf("%X\n",addr.sin_addr.s_addr); + //~ if(sock < 0) + //~ { + //~ perror("accept"); + //~ exit(3); + //~ } +//~ + //~ while(1) + //~ { + //~ bytes_read = recv(sock, buf, 1024, 0); + //~ if(bytes_read <= 0) break; + //~ send(sock, buf, bytes_read, 0); + //~ printf("%s", buf); + //~ } +//~ + //~ close(sock); + //~ } +//~ + //~ return 0; +//~ } diff --git a/c/masync_server/int2ip.c b/c/masync_server/int2ip.c index eccdb22..81c4c14 100644 --- a/c/masync_server/int2ip.c +++ b/c/masync_server/int2ip.c @@ -12,22 +12,24 @@ #include -static const char * int2ip(unsigned int ip) { +static const char *int2ip(unsigned int ip) +{ static char s[16]; - sprintf(s,"%d.%d.%d.%d",ip>>24,ip<<8>>24,ip<<16>>24,ip<<24>>24); + sprintf(s, "%d.%d.%d.%d", ip >> 24, ip << 8 >> 24, ip << 16 >> 24, ip << 24 >> 24); return s; } -int main(int argc, char * argv[]) { - if(argc < 2) { +int main(int argc, char *argv[]) +{ + if (argc < 2) { perror("need hex ip address"); exit(1); } unsigned int ip = 0; - sscanf(argv[1],"%X",&ip); + sscanf(argv[1], "%X", &ip); - printf("%s\n",int2ip(ip)); + printf("%s\n", int2ip(ip)); return 0; } diff --git a/c/masync_server/ip2int.c b/c/masync_server/ip2int.c index 7b44472..2f79d99 100644 --- a/c/masync_server/ip2int.c +++ b/c/masync_server/ip2int.c @@ -12,19 +12,21 @@ #include -unsigned int ip2int(const char * s) { +unsigned int ip2int(const char *s) +{ int ip[4]; - sscanf(s,"%d.%d.%d.%d",ip,ip+1,ip+2,ip+3); - return ip[0]<<24|ip[1]<<16|ip[2]<<8|ip[3]; + sscanf(s, "%d.%d.%d.%d", ip, ip + 1, ip + 2, ip + 3); + return ip[0] << 24 | ip[1] << 16 | ip[2] << 8 | ip[3]; } -int main(int argc, char * argv[]) { - if(argc < 2) { +int main(int argc, char *argv[]) +{ + if (argc < 2) { perror("need ip address string"); exit(1); } - printf("%X\n",ip2int(argv[1])); + printf("%X\n", ip2int(argv[1])); return 0; } diff --git a/c/masync_server/sock1.c b/c/masync_server/sock1.c index 27c255c..e5f22ae 100644 --- a/c/masync_server/sock1.c +++ b/c/masync_server/sock1.c @@ -38,9 +38,9 @@ //~ //~ while(total < len) //~ { - //~ n = send(s, buf+total, len-total, flags); - //~ if(n == -1) { break; } - //~ total += n; + //~ n = send(s, buf+total, len-total, flags); + //~ if(n == -1) { break; } + //~ total += n; //~ } //~ //~ return (n==-1 ? -1 : total); @@ -54,11 +54,9 @@ // запрет передачи 0 - чтения, 1 - записи, 2 - и того, и др. //~ int shutdown(int sockfd, int how); - -int main(int argc, char * argv[]) +int main(int argc, char *argv[]) { - struct a - { + struct a { char aaa[256]; }; diff --git a/c/masync_server/tmp1.c b/c/masync_server/tmp1.c index e41de65..c2069e9 100644 --- a/c/masync_server/tmp1.c +++ b/c/masync_server/tmp1.c @@ -3,21 +3,21 @@ void f() { int c = 0; - printf("c=%lu\n",(unsigned long)&c); + printf("c=%lu\n", (unsigned long)&c); } - -int main(int argc, char * argv[]) { +int main(int argc, char *argv[]) +{ //~ printf("%lu\n", sizeof(long)); int a = 0; int pid = fork(); - printf("pid=%d\n",pid); - printf("a=%lu\n",(unsigned long)&a); + printf("pid=%d\n", pid); + printf("a=%lu\n", (unsigned long)&a); int b = 0; - printf("b=%lu\n",(unsigned long)&b); + printf("b=%lu\n", (unsigned long)&b); f();