#include #include #include #include #include #include #include #include #include #include #include "xerror.h" #include "netfuncs.h" #define NUM_THREADS 1 #define MAX_CONNECTIONS 1 void *serv_request(void *data) { struct connection_cb { int dataSocket; char data[256]; int dataSent; int dataToSend; int isReading; struct connection_cb *next; }; struct connection_cb *connections = NULL; int listenSocket = *(int *)data; // дескриптор сокета передаётся через аргумент pthread if (fcntl(listenSocket, F_SETFL, O_NONBLOCK) < 0) // делает сокет неблокирующим die("fcntl()", errno); while (1) { fd_set readFdSet; // набор дескрипторов сокетов, с кт. в данный момент идёт чтение fd_set writeFdSet; // набор дескрипторов сокетов, в кт. в данный момент идёт запись struct connection_cb *currentConn, **currentConnPtr, *tempConn; // выбранное соединение, ???, ??? int maxFdNum; // ??? FD_ZERO(&readFdSet); // очистка набора дескрипторов чтения FD_ZERO(&writeFdSet); // очистка набора дескрипторов записи /* * Добавление дескриптора к множеству readFdSet */ FD_SET(listenSocket, &readFdSet); // слушающий порт добавляется в список читающих (сюда же входит connect()) maxFdNum = listenSocket; // ??? не разобрался /* * Разделение дескрипторов м/у множествами readFdSet и writeFdSet */ for (currentConn = connections; currentConn != NULL; currentConn = currentConn->next) { if (currentConn->isReading) FD_SET(currentConn->dataSocket, &readFdSet); else FD_SET(currentConn->dataSocket, &writeFdSet); maxFdNum = currentConn->dataSocket > maxFdNum ? currentConn->dataSocket : maxFdNum; } /* * Получение множества дескрипторов сокетов для обработки (ожидание изменения состояния на сокетах) */ if (select(maxFdNum + 1, &readFdSet, &writeFdSet, NULL, NULL) < 0) { if (errno == EINTR) continue; die("select()", errno); } currentConnPtr = &connections; while (*currentConnPtr != NULL) { /* * Проверка принадлежности дескриптора * (*currentConnPtr)->dataSocket к множеству readFdSet */ if ((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket, &readFdSet)) { int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data, sizeof((*currentConnPtr)->data), 0); if (result < 0) { if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { bark("recv()", errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else if (result == 0) { close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } else { (*currentConnPtr)->dataToSend = result; (*currentConnPtr)->dataSent = 0; (*currentConnPtr)->isReading = 0; printf("Recieving as Slave Thread id = '%d' \n", (int)pthread_self()); } } else /* * Проверка принадлежности дескриптора * (*currentConnPtr)->dataSocket к множеству writedFdSet */ if (FD_ISSET((*currentConnPtr)->dataSocket, &writeFdSet)) { int result = send((*currentConnPtr)->dataSocket, (*currentConnPtr)->data + (*currentConnPtr)->dataSent, (*currentConnPtr)->dataToSend - (*currentConnPtr)->dataSent, 0); if (result < 0) { if (errno != EINTR && errno != EAGAIN) { bark("write()", errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else { (*currentConnPtr)->dataSent += result; if ((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend) (*currentConnPtr)->isReading = 1; } } currentConnPtr = &((*currentConnPtr)->next); printf("Sending as Slave Thread id = '%d' \n", (int)pthread_self()); } /* Проверка принадлежности дескриптора listenSocket к множеству readFdSet,т.е. необходимости обработать вызов connect() от нового клиента. */ if (FD_ISSET(listenSocket, &readFdSet)) { while (1) { int result = accept(listenSocket, (struct sockaddr *)NULL, NULL); if (result < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; die("accept()", errno); } else { *currentConnPtr = malloc(sizeof(struct connection_cb)); if (*currentConnPtr == NULL) die("malloc()", 0); if (fcntl(result, F_SETFL, O_NONBLOCK) < 0) die("fcntl()", errno); (*currentConnPtr)->dataSocket = result; (*currentConnPtr)->isReading = 1; (*currentConnPtr)->next = 0; currentConnPtr = &((*currentConnPtr)->next); printf("Accepting as Master Thread id = '%d' \n", (int)pthread_self()); } } } } } /* * Установка слушающего сокета на порту port */ int getServerSocket(unsigned short int port) { int listenSocket; struct sockaddr_in listenSockaddr; // создание сокета if ((listenSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0) die("socket()", errno); // задание адреса сокета и его параметров memset(&listenSockaddr, 0, sizeof(listenSockaddr)); listenSockaddr.sin_family = PF_INET; listenSockaddr.sin_port = htons(port); listenSockaddr.sin_addr.s_addr = INADDR_ANY; // привязка сокета listenSocket к адресу listenSockaddr if (bind(listenSocket, (struct sockaddr *)&listenSockaddr, sizeof(listenSockaddr)) < 0) die("bind()", errno); // установка очереди входящих соединений в MAX_CONNECTIONS if (listen(listenSocket, MAX_CONNECTIONS) < 0) die("listen()", errno); return listenSocket; } /* * Основная процедура создания слушающего сокета * и нитей, обрабатывающих входящие соединения */ int main(int argc, char *argv[]) { int k; int descSock; char *service = "3425"; // порт в argv[1] switch (argc) { case 1: break; case 2: service = argv[1]; break; default: printf("Usage: ./echo-server [port]\n"); exit(1); } // NUM_THREADS нитей pthread_t p_thread[NUM_THREADS]; pthread_attr_t attr; // инициализация аттрибутов нитей pthread_attr_init(&attr); size_t stacksize = 512; pthread_attr_setstacksize(&attr, stacksize); pthread_attr_getstacksize(&attr, &stacksize); // создание слушающего сокета descSock = getServerSocket(atoi(service)); // создание NUM_THREADS нитей for (k = 0; k < NUM_THREADS; k++) { pthread_create(&p_thread[k], &attr, serv_request, (void *)&descSock); printf("Thread %d started\n", k); } // уничтожение объекта атрибутов pthread_attr_destroy(&attr); // ожидание завершения работы нитей for (k = 0; k < NUM_THREADS; k++) { pthread_join(p_thread[k], NULL); printf("Completed join with thread %d\n", k); } }