/* * ServerNBTHR.c */ #include #include #include #include #include #include #include #include #include #include #include #define NUM_THREADS 512 pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER; void die(const char *func, int err) { fprintf(stderr, "%s: %s\n", func, strerror(err)); abort(); } void bark(const char *func, int err) { fprintf(stderr, "%s: %s\n", func, strerror(err)); } /* Описание поцедуры ведущего потока , которая возвращает дескрипторов пассивного сокета, привязанного к адресу сервера. */ int getServerSocket(unsigned short int port) { int listenSocket; struct sockaddr_in listenSockaddr; if ((listenSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0) die("socket()", errno); memset(&listenSockaddr, 0, sizeof(listenSockaddr)); listenSockaddr.sin_family = PF_INET; listenSockaddr.sin_port = htons(port); listenSockaddr.sin_addr.s_addr = INADDR_ANY; if (bind(listenSocket, (struct sockaddr *)&listenSockaddr, sizeof(listenSockaddr)) < 0) die("bind()", errno); if (listen(listenSocket, 5) < 0) die("listen()", errno); return listenSocket; } /* Описание процедуры выполняемой всеми ведомыми потоками */ void *serv_request(void *data) { struct connection_cb { int dataSocket; char data[256]; int dataSent; int dataToSend; int isReading; struct connection_cb *next; }; struct connection_cb *connections = NULL; int listenSocket = (int)data; if (fcntl(listenSocket, F_SETFL, O_NONBLOCK) < 0) die("fcntl()", errno); while (1) { fd_set readFdSet; fd_set writeFdSet; struct connection_cb *currentConn, **currentConnPtr, *tempConn; int maxFdNum; FD_ZERO(&readFdSet); FD_ZERO(&writeFdSet); /* Добавление дескриптора к множеству readFdSet */ FD_SET(listenSocket, &readFdSet); maxFdNum = listenSocket; for (currentConn = connections; currentConn != NULL; currentConn = currentConn->next) { if (currentConn->isReading) FD_SET(currentConn->dataSocket, &readFdSet); else FD_SET(currentConn->dataSocket, &writeFdSet); maxFdNum = currentConn->dataSocket > maxFdNum ? currentConn - >dataSocket : maxFdNum; } /* Получение множества дескрипторов сокетов для обработки */ if (select(maxFdNum + 1, &readFdSet, &writeFdSet, NULL, NULL) < 0) { if (errno == EINTR) continue; die("select()", errno); } currentConnPtr = &connections; while (*currentConnPtr != NULL) { /* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству readFdSet */ if ((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket, &readFdSet)) { int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data, sizeof((*currentConnPtr)->data), 0); if (result < 0) { if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { bark("recv()", errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else if (result == 0) { close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } else { (*currentConnPtr)->dataToSend = result; (*currentConnPtr)->dataSent = 0; (*currentConnPtr)->isReading = 0; printf("Recieving as Slave Thread id = '%d' \n", pthread_self()); } } else /* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству writedFdSet */ if (FD_ISSET((*currentConnPtr)->dataSocket, &writeFdSet)) { int result = send((*currentConnPtr)->dataSocket, (*currentConnPtr)->data + (*currentConnPtr)->dataSent, (*currentConnPtr)->dataToSend - (*currentConnPtr)->dataSent, 0); if (result < 0) { if (errno != EINTR && errno != EAGAIN) { bark("write()", errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else { (*currentConnPtr)->dataSent += result; if ((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend) (*currentConnPtr)->isReading = 1; } } currentConnPtr = &((*currentConnPtr)->next); printf("Sending as Slave Thread id = '%d' \n", pthread_self()); } /* Проверка принадлежности дескриптора listenSocket к множеству readFdSet,т.е. необходимости обработать вызов connect( ) от нового клиента. */ if (FD_ISSET(listenSocket, &readFdSet)) { while (1) { /* Вызовы pthread_mutex_lock, pthread_mutex_unlock Не нужны в среде Linux */ pthread_mutex_lock(&request_mutex); int result = accept(listenSocket, (struct sockaddr *)NULL, NULL); pthread_mutex_unlock(&request_mutex); if (result < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; die("accept()", errno); } else { *currentConnPtr = malloc(sizeof(struct connection_cb)); if (*currentConnPtr == NULL) die("malloc()", 0); if (fcntl(result, F_SETFL, O_NONBLOCK) < 0) die("fcntl()", errno); (*currentConnPtr)->dataSocket = result; (*currentConnPtr)->isReading = 1; (*currentConnPtr)->next = 0; currentConnPtr = &((*currentConnPtr)->next); printf("Accepting as Master Thread id = '%d' \n", pthread_self()); } } } } } int main(int argc, char *argv[]) { int k; int descSock; char *service = "1500"; switch (argc) { case 1: break; case 2: service = argv[1]; break; default: printf("Usage: ./ServerBNTH [port]\n"); exit(1); } size_t stacksize; pthread_t p_thread[NUM_THREADS]; /* Установка размера стека для ведомых потоков */ pthread_attr_t attr; pthread_attr_init(&attr); stacksize = 500000; pthread_attr_setstacksize(&attr, stacksize); pthread_attr_getstacksize(&attr, &stacksize); /* Получение значения дескриптора пассивного сокета */ descSock = getServerSocket(atoi(service)); /* Запуск ведомых потоков */ for (k = 0; k < NUM_THREADS; k++) { pthread_create(&p_thread[k], &attr, serv_request, (void *)descSock); printf("Thread %d started \n", k); } pthread_attr_destroy(&attr); for (k = 0; k < NUM_THREADS; k++) { pthread_join(p_thread[k], NULL); printf("Completed join with thread %d\n", k); } }