dev/c/ServerNBTHR/ServerNBTHR.c

262 lines
6.9 KiB
C
Raw Normal View History

/*
* ServerNBTHR.c
*/
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#define NUM_THREADS 512
pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER;
2011-05-02 12:11:12 +04:00
void die(const char *func, int err)
{
2011-05-02 12:11:12 +04:00
fprintf(stderr, "%s: %s\n", func, strerror(err));
abort();
}
2011-05-02 12:11:12 +04:00
void bark(const char *func, int err)
{
2011-05-02 12:11:12 +04:00
fprintf(stderr, "%s: %s\n", func, strerror(err));
}
2011-05-02 12:11:12 +04:00
/*
Описание поцедуры ведущего потока , которая возвращает
дескрипторов пассивного сокета, привязанного к адресу
сервера.
*/
2011-05-02 12:11:12 +04:00
int getServerSocket(unsigned short int port)
{
2011-05-02 12:11:12 +04:00
int listenSocket;
struct sockaddr_in listenSockaddr;
2011-05-02 12:11:12 +04:00
if ((listenSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0)
die("socket()", errno);
memset(&listenSockaddr, 0, sizeof(listenSockaddr));
listenSockaddr.sin_family = PF_INET;
listenSockaddr.sin_port = htons(port);
listenSockaddr.sin_addr.s_addr = INADDR_ANY;
2011-05-02 12:11:12 +04:00
if (bind(listenSocket, (struct sockaddr *)&listenSockaddr, sizeof(listenSockaddr)) < 0)
die("bind()", errno);
2011-05-02 12:11:12 +04:00
if (listen(listenSocket, 5) < 0)
die("listen()", errno);
2011-05-02 12:11:12 +04:00
return listenSocket;
}
/*
Описание процедуры выполняемой всеми ведомыми потоками
*/
2011-05-02 12:11:12 +04:00
void *serv_request(void *data)
{
2011-05-02 12:11:12 +04:00
struct connection_cb {
int dataSocket;
char data[256];
int dataSent;
int dataToSend;
int isReading;
struct connection_cb *next;
};
struct connection_cb *connections = NULL;
int listenSocket = (int)data;
if (fcntl(listenSocket, F_SETFL, O_NONBLOCK) < 0)
die("fcntl()", errno);
while (1) {
fd_set readFdSet;
fd_set writeFdSet;
struct connection_cb *currentConn, **currentConnPtr, *tempConn;
int maxFdNum;
FD_ZERO(&readFdSet);
FD_ZERO(&writeFdSet);
2011-05-02 12:11:12 +04:00
/*
Добавление дескриптора к множеству readFdSet
*/
FD_SET(listenSocket, &readFdSet);
maxFdNum = listenSocket;
2011-05-02 12:11:12 +04:00
for (currentConn = connections; currentConn != NULL; currentConn = currentConn->next) {
if (currentConn->isReading)
FD_SET(currentConn->dataSocket, &readFdSet);
else
FD_SET(currentConn->dataSocket, &writeFdSet);
maxFdNum = currentConn->dataSocket > maxFdNum ? currentConn - >dataSocket : maxFdNum;
}
2011-05-02 12:11:12 +04:00
/*
Получение множества дескрипторов сокетов для обработки
*/
if (select(maxFdNum + 1, &readFdSet, &writeFdSet, NULL, NULL) < 0) {
if (errno == EINTR)
continue;
die("select()", errno);
}
2011-05-02 12:11:12 +04:00
currentConnPtr = &connections;
while (*currentConnPtr != NULL) {
/*
Проверка принадлежности дескриптора
(*currentConnPtr)->dataSocket к множеству readFdSet
*/
if ((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket, &readFdSet)) {
int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data,
sizeof((*currentConnPtr)->data), 0);
if (result < 0) {
if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
bark("recv()", errno);
close((*currentConnPtr)->dataSocket);
tempConn = *currentConnPtr;
*currentConnPtr = (*currentConnPtr)->next;
free(tempConn);
continue;
}
} else if (result == 0) {
close((*currentConnPtr)->dataSocket);
tempConn = *currentConnPtr;
*currentConnPtr = (*currentConnPtr)->next;
free(tempConn);
continue;
} else {
(*currentConnPtr)->dataToSend = result;
(*currentConnPtr)->dataSent = 0;
(*currentConnPtr)->isReading = 0;
printf("Recieving as Slave Thread id = '%d' \n", pthread_self());
}
} else
/*
Проверка принадлежности дескриптора
(*currentConnPtr)->dataSocket к множеству writedFdSet
*/
if (FD_ISSET((*currentConnPtr)->dataSocket, &writeFdSet)) {
int result = send((*currentConnPtr)->dataSocket,
(*currentConnPtr)->data + (*currentConnPtr)->dataSent,
(*currentConnPtr)->dataToSend - (*currentConnPtr)->dataSent, 0);
if (result < 0) {
if (errno != EINTR && errno != EAGAIN) {
bark("write()", errno);
close((*currentConnPtr)->dataSocket);
tempConn = *currentConnPtr;
*currentConnPtr = (*currentConnPtr)->next;
free(tempConn);
continue;
}
} else {
(*currentConnPtr)->dataSent += result;
if ((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend)
(*currentConnPtr)->isReading = 1;
}
}
currentConnPtr = &((*currentConnPtr)->next);
2011-05-02 12:11:12 +04:00
printf("Sending as Slave Thread id = '%d' \n", pthread_self());
}
2011-05-02 12:11:12 +04:00
/*
Проверка принадлежности дескриптора listenSocket
к множеству readFdSet,т.е. необходимости обработать
вызов connect( ) от нового клиента.
*/
if (FD_ISSET(listenSocket, &readFdSet)) {
while (1) {
/*
Вызовы pthread_mutex_lock, pthread_mutex_unlock
Не нужны в среде Linux
*/
pthread_mutex_lock(&request_mutex);
int result = accept(listenSocket, (struct sockaddr *)NULL, NULL);
pthread_mutex_unlock(&request_mutex);
if (result < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
die("accept()", errno);
} else {
*currentConnPtr = malloc(sizeof(struct connection_cb));
if (*currentConnPtr == NULL)
die("malloc()", 0);
if (fcntl(result, F_SETFL, O_NONBLOCK) < 0)
die("fcntl()", errno);
(*currentConnPtr)->dataSocket = result;
(*currentConnPtr)->isReading = 1;
(*currentConnPtr)->next = 0;
currentConnPtr = &((*currentConnPtr)->next);
printf("Accepting as Master Thread id = '%d' \n", pthread_self());
}
}
}
}
}
2011-05-02 12:11:12 +04:00
int main(int argc, char *argv[])
{
2011-05-02 12:11:12 +04:00
int k;
int descSock;
char *service = "1500";
switch (argc) {
case 1:
break;
case 2:
service = argv[1];
break;
default:
printf("Usage: ./ServerBNTH [port]\n");
exit(1);
}
2011-05-02 12:11:12 +04:00
size_t stacksize;
pthread_t p_thread[NUM_THREADS];
/*
Установка размера стека для ведомых потоков
*/
2011-05-02 12:11:12 +04:00
pthread_attr_t attr;
pthread_attr_init(&attr);
stacksize = 500000;
pthread_attr_setstacksize(&attr, stacksize);
pthread_attr_getstacksize(&attr, &stacksize);
/*
Получение значения дескриптора пассивного сокета
*/
2011-05-02 12:11:12 +04:00
descSock = getServerSocket(atoi(service));
/*
Запуск ведомых потоков
*/
2011-05-02 12:11:12 +04:00
for (k = 0; k < NUM_THREADS; k++) {
pthread_create(&p_thread[k], &attr, serv_request, (void *)descSock);
printf("Thread %d started \n", k);
}
2011-05-02 12:11:12 +04:00
pthread_attr_destroy(&attr);
2011-05-02 12:11:12 +04:00
for (k = 0; k < NUM_THREADS; k++) {
pthread_join(p_thread[k], NULL);
printf("Completed join with thread %d\n", k);
}
}