dev/c/masync_server/echo-server.c

246 lines
7.7 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>
#include <fcntl.h>
#include <string.h>
#include "xerror.h"
#include "netfuncs.h"
#define NUM_THREADS 1
#define MAX_CONNECTIONS 1
void *serv_request(void *data)
{
struct connection_cb {
int dataSocket;
char data[256];
int dataSent;
int dataToSend;
int isReading;
struct connection_cb *next;
};
struct connection_cb *connections = NULL;
int listenSocket = *(int *)data; // дескриптор сокета передаётся через аргумент pthread
if (fcntl(listenSocket, F_SETFL, O_NONBLOCK) < 0) // делает сокет неблокирующим
die("fcntl()", errno);
while (1) {
fd_set readFdSet; // набор дескрипторов сокетов, с кт. в данный момент идёт чтение
fd_set writeFdSet; // набор дескрипторов сокетов, в кт. в данный момент идёт запись
struct connection_cb *currentConn, **currentConnPtr, *tempConn; // выбранное соединение, ???, ???
int maxFdNum; // ???
FD_ZERO(&readFdSet); // очистка набора дескрипторов чтения
FD_ZERO(&writeFdSet); // очистка набора дескрипторов записи
/*
* Добавление дескриптора к множеству readFdSet
*/
FD_SET(listenSocket, &readFdSet); // слушающий порт добавляется в список читающих (сюда же входит connect())
maxFdNum = listenSocket; // ??? не разобрался
/*
* Разделение дескрипторов м/у множествами readFdSet и writeFdSet
*/
for (currentConn = connections; currentConn != NULL; currentConn = currentConn->next) {
if (currentConn->isReading)
FD_SET(currentConn->dataSocket, &readFdSet);
else
FD_SET(currentConn->dataSocket, &writeFdSet);
maxFdNum = currentConn->dataSocket > maxFdNum ? currentConn->dataSocket : maxFdNum;
}
/*
* Получение множества дескрипторов сокетов для обработки (ожидание изменения состояния на сокетах)
*/
if (select(maxFdNum + 1, &readFdSet, &writeFdSet, NULL, NULL) < 0) {
if (errno == EINTR)
continue;
die("select()", errno);
}
currentConnPtr = &connections;
while (*currentConnPtr != NULL) {
/*
* Проверка принадлежности дескриптора
* (*currentConnPtr)->dataSocket к множеству readFdSet
*/
if ((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket, &readFdSet)) {
int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data,
sizeof((*currentConnPtr)->data), 0);
if (result < 0) {
if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
bark("recv()", errno);
close((*currentConnPtr)->dataSocket);
tempConn = *currentConnPtr;
*currentConnPtr = (*currentConnPtr)->next;
free(tempConn);
continue;
}
} else if (result == 0) {
close((*currentConnPtr)->dataSocket);
tempConn = *currentConnPtr;
*currentConnPtr = (*currentConnPtr)->next;
free(tempConn);
continue;
} else {
(*currentConnPtr)->dataToSend = result;
(*currentConnPtr)->dataSent = 0;
(*currentConnPtr)->isReading = 0;
printf("Recieving as Slave Thread id = '%d' \n", (int)pthread_self());
}
} else
/*
* Проверка принадлежности дескриптора
* (*currentConnPtr)->dataSocket к множеству writedFdSet
*/
if (FD_ISSET((*currentConnPtr)->dataSocket, &writeFdSet)) {
int result = send((*currentConnPtr)->dataSocket,
(*currentConnPtr)->data + (*currentConnPtr)->dataSent,
(*currentConnPtr)->dataToSend - (*currentConnPtr)->dataSent, 0);
if (result < 0) {
if (errno != EINTR && errno != EAGAIN) {
bark("write()", errno);
close((*currentConnPtr)->dataSocket);
tempConn = *currentConnPtr;
*currentConnPtr = (*currentConnPtr)->next;
free(tempConn);
continue;
}
} else {
(*currentConnPtr)->dataSent += result;
if ((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend)
(*currentConnPtr)->isReading = 1;
}
}
currentConnPtr = &((*currentConnPtr)->next);
printf("Sending as Slave Thread id = '%d' \n", (int)pthread_self());
}
/*
Проверка принадлежности дескриптора listenSocket
к множеству readFdSet,т.е. необходимости обработать
вызов connect() от нового клиента.
*/
if (FD_ISSET(listenSocket, &readFdSet)) {
while (1) {
int result = accept(listenSocket, (struct sockaddr *)NULL, NULL);
if (result < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
die("accept()", errno);
} else {
*currentConnPtr = malloc(sizeof(struct connection_cb));
if (*currentConnPtr == NULL)
die("malloc()", 0);
if (fcntl(result, F_SETFL, O_NONBLOCK) < 0)
die("fcntl()", errno);
(*currentConnPtr)->dataSocket = result;
(*currentConnPtr)->isReading = 1;
(*currentConnPtr)->next = 0;
currentConnPtr = &((*currentConnPtr)->next);
printf("Accepting as Master Thread id = '%d' \n", (int)pthread_self());
}
}
}
}
}
/*
* Установка слушающего сокета на порту port
*/
int getServerSocket(unsigned short int port)
{
int listenSocket;
struct sockaddr_in listenSockaddr;
// создание сокета
if ((listenSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0)
die("socket()", errno);
// задание адреса сокета и его параметров
memset(&listenSockaddr, 0, sizeof(listenSockaddr));
listenSockaddr.sin_family = PF_INET;
listenSockaddr.sin_port = htons(port);
listenSockaddr.sin_addr.s_addr = INADDR_ANY;
// привязка сокета listenSocket к адресу listenSockaddr
if (bind(listenSocket, (struct sockaddr *)&listenSockaddr, sizeof(listenSockaddr)) < 0)
die("bind()", errno);
// установка очереди входящих соединений в MAX_CONNECTIONS
if (listen(listenSocket, MAX_CONNECTIONS) < 0)
die("listen()", errno);
return listenSocket;
}
/*
* Основная процедура создания слушающего сокета
* и нитей, обрабатывающих входящие соединения
*/
int main(int argc, char *argv[])
{
int k;
int descSock;
char *service = "3425";
// порт в argv[1]
switch (argc) {
case 1:
break;
case 2:
service = argv[1];
break;
default:
printf("Usage: ./echo-server [port]\n");
exit(1);
}
// NUM_THREADS нитей
pthread_t p_thread[NUM_THREADS];
pthread_attr_t attr;
// инициализация аттрибутов нитей
pthread_attr_init(&attr);
size_t stacksize = 512;
pthread_attr_setstacksize(&attr, stacksize);
pthread_attr_getstacksize(&attr, &stacksize);
// создание слушающего сокета
descSock = getServerSocket(atoi(service));
// создание NUM_THREADS нитей
for (k = 0; k < NUM_THREADS; k++) {
pthread_create(&p_thread[k], &attr, serv_request, (void *)&descSock);
printf("Thread %d started\n", k);
}
// уничтожение объекта атрибутов
pthread_attr_destroy(&attr);
// ожидание завершения работы нитей
for (k = 0; k < NUM_THREADS; k++) {
pthread_join(p_thread[k], NULL);
printf("Completed join with thread %d\n", k);
}
}