#include #include #include #include #include #include #include #include #include #include "xerror.h" #include "netfuncs.h" #define NUM_THREADS 8 pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER; void *serv_request(void *data) { struct connection_cb { int dataSocket; char data[256]; int dataSent; int dataToSend; int isReading; struct connection_cb *next; }; struct connection_cb *connections = NULL; int listenSocket = *(int *)data; if (fcntl(listenSocket, F_SETFL, O_NONBLOCK) < 0) die("fcntl()", errno); while (1) { fd_set readFdSet; fd_set writeFdSet; struct connection_cb *currentConn, **currentConnPtr, *tempConn; int maxFdNum; FD_ZERO(&readFdSet); FD_ZERO(&writeFdSet); /* Добавление дескриптора к множеству readFdSet */ FD_SET(listenSocket, &readFdSet); maxFdNum = listenSocket; for (currentConn = connections; currentConn != NULL; currentConn = currentConn->next) { if (currentConn->isReading) FD_SET(currentConn->dataSocket, &readFdSet); else FD_SET(currentConn->dataSocket, &writeFdSet); maxFdNum = currentConn->dataSocket > maxFdNum ? currentConn->dataSocket : maxFdNum; } /* Получение множества дескрипторов сокетов для обработки */ if (select(maxFdNum + 1, &readFdSet, &writeFdSet, NULL, NULL) < 0) { if (errno == EINTR) continue; die("select()", errno); } currentConnPtr = &connections; while (*currentConnPtr != NULL) { /* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству readFdSet */ if ((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket, &readFdSet)) { int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data, sizeof((*currentConnPtr)->data), 0); if (result < 0) { if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { bark("recv()", errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else if (result == 0) { close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } else { (*currentConnPtr)->dataToSend = result; (*currentConnPtr)->dataSent = 0; (*currentConnPtr)->isReading = 0; printf("Recieving as Slave Thread id = '%d' \n", (int)pthread_self()); } } else /* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству writedFdSet */ if (FD_ISSET((*currentConnPtr)->dataSocket, &writeFdSet)) { int result = send((*currentConnPtr)->dataSocket, (*currentConnPtr)->data + (*currentConnPtr)->dataSent, (*currentConnPtr)->dataToSend - (*currentConnPtr)->dataSent, 0); if (result < 0) { if (errno != EINTR && errno != EAGAIN) { bark("write()", errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else { (*currentConnPtr)->dataSent += result; if ((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend) (*currentConnPtr)->isReading = 1; } } currentConnPtr = &((*currentConnPtr)->next); printf("Sending as Slave Thread id = '%d' \n", (int)pthread_self()); } /* Проверка принадлежности дескриптора listenSocket к множеству readFdSet,т.е. необходимости обработать вызов connect( ) от нового клиента. */ if (FD_ISSET(listenSocket, &readFdSet)) { while (1) { /* Вызовы pthread_mutex_lock, pthread_mutex_unlock Не нужны в среде Linux */ pthread_mutex_lock(&request_mutex); int result = accept(listenSocket, (struct sockaddr *)NULL, NULL); pthread_mutex_unlock(&request_mutex); if (result < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; die("accept()", errno); } else { *currentConnPtr = malloc(sizeof(struct connection_cb)); if (*currentConnPtr == NULL) die("malloc()", 0); if (fcntl(result, F_SETFL, O_NONBLOCK) < 0) die("fcntl()", errno); (*currentConnPtr)->dataSocket = result; (*currentConnPtr)->isReading = 1; (*currentConnPtr)->next = 0; currentConnPtr = &((*currentConnPtr)->next); printf("Accepting as Master Thread id = '%d' \n", (int)pthread_self()); } } } } } int getServerSocket(unsigned short int port) { int listenSocket; struct sockaddr_in listenSockaddr; if ((listenSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0) die("socket()", errno); memset(&listenSockaddr, 0, sizeof(listenSockaddr)); listenSockaddr.sin_family = PF_INET; listenSockaddr.sin_port = htons(port); listenSockaddr.sin_addr.s_addr = INADDR_ANY; if (bind(listenSocket, (struct sockaddr *)&listenSockaddr, sizeof(listenSockaddr)) < 0) die("bind()", errno); if (listen(listenSocket, 5) < 0) die("listen()", errno); return listenSocket; } int main(int argc, char *argv[]) { int k; int descSock; char *service = "1500"; switch (argc) { case 1: break; case 2: service = argv[1]; break; default: printf("Usage: ./echo-server [port]\n"); exit(1); } pthread_t p_thread[NUM_THREADS]; pthread_attr_t attr; pthread_attr_init(&attr); size_t stacksize = 512; pthread_attr_setstacksize(&attr, stacksize); pthread_attr_getstacksize(&attr, &stacksize); descSock = getServerSocket(atoi(service)); for (k = 0; k < NUM_THREADS; k++) { pthread_create(&p_thread[k], &attr, serv_request, (void *)&descSock); printf("Thread %d started\n", k); } pthread_attr_destroy(&attr); for (k = 0; k < NUM_THREADS; k++) { pthread_join(p_thread[k], NULL); printf("Completed join with thread %d\n", k); } } //~ int tmp() //~ { //~ int sock, listener; //~ struct sockaddr_in addr; //~ char buf[1024]; //~ int bytes_read; //~ //~ listener = socket(AF_INET, SOCK_STREAM, 0); //~ if(listener < 0) //~ { //~ perror("socket"); //~ exit(1); //~ } //~ //~ addr.sin_family = AF_INET; //~ addr.sin_port = htons(3425); //~ addr.sin_addr.s_addr = htonl(INADDR_ANY); //~ if(bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0) //~ { //~ perror("bind"); //~ exit(2); //~ } //~ //~ listen(listener, 1); //~ //printf("%X\n",addr.sin_addr.s_addr); //~ while(1) //~ { //~ socklen_t n = sizeof(struct sockaddr); //~ sock = accept(listener, (struct sockaddr *)&addr, &n); //~ printf("incoming request from %s\n",int2ip(ntohl(addr.sin_addr.s_addr))); //~ //printf("%X\n",addr.sin_addr.s_addr); //~ if(sock < 0) //~ { //~ perror("accept"); //~ exit(3); //~ } //~ //~ while(1) //~ { //~ bytes_read = recv(sock, buf, 1024, 0); //~ if(bytes_read <= 0) break; //~ send(sock, buf, bytes_read, 0); //~ printf("%s", buf); //~ } //~ //~ close(sock); //~ } //~ //~ return 0; //~ }