ENH: Implemented handling of SIGCHLD to detect the termination of immediate children. This allows grandchildren to remain running after the children exit.

This commit is contained in:
Brad King 2006-05-21 10:26:28 -04:00
parent 3dd70af5a0
commit 0d594a4538
1 changed files with 312 additions and 82 deletions

View File

@ -24,17 +24,18 @@
Implementation for UNIX Implementation for UNIX
On UNIX, a child process is forked to exec the program. Three On UNIX, a child process is forked to exec the program. Three output
output pipes from the child are read by the parent process using a pipes are read by the parent process using a select call to block
select call to block until data are ready. Two of the pipes are until data are ready. Two of the pipes are stdout and stderr for the
stdout and stderr for the child. The third is a special error pipe child. The third is a special pipe populated by a signal handler to
that has two purposes. First, if the child cannot exec the program, indicate that a child has terminated. This is used in conjunction
the error is reported through the error pipe. Second, the error with the timeout on the select call to implement a timeout for program
pipe is left open until the child exits. This is used in even when it closes stdout and stderr and at the same time avoiding
conjunction with the timeout on the select call to implement a races.
timeout for program even when it closes stdout and stderr.
*/ */
/* /*
TODO: TODO:
@ -68,7 +69,7 @@ do.
#define KWSYSPE_PIPE_COUNT 3 #define KWSYSPE_PIPE_COUNT 3
#define KWSYSPE_PIPE_STDOUT 0 #define KWSYSPE_PIPE_STDOUT 0
#define KWSYSPE_PIPE_STDERR 1 #define KWSYSPE_PIPE_STDERR 1
#define KWSYSPE_PIPE_TERM 2 #define KWSYSPE_PIPE_SIGNAL 2
/* The maximum amount to read from a pipe at a time. */ /* The maximum amount to read from a pipe at a time. */
#define KWSYSPE_PIPE_BUFFER_SIZE 1024 #define KWSYSPE_PIPE_BUFFER_SIZE 1024
@ -89,7 +90,6 @@ typedef struct kwsysProcessCreateInformation_s
int StdIn; int StdIn;
int StdOut; int StdOut;
int StdErr; int StdErr;
int TermPipe;
int ErrorPipe[2]; int ErrorPipe[2];
} kwsysProcessCreateInformation; } kwsysProcessCreateInformation;
@ -99,6 +99,7 @@ static void kwsysProcessCleanup(kwsysProcess* cp, int error);
static void kwsysProcessCleanupDescriptor(int* pfd); static void kwsysProcessCleanupDescriptor(int* pfd);
static int kwsysProcessCreate(kwsysProcess* cp, int prIndex, static int kwsysProcessCreate(kwsysProcess* cp, int prIndex,
kwsysProcessCreateInformation* si, int* readEnd); kwsysProcessCreateInformation* si, int* readEnd);
static void kwsysProcessDestroy(kwsysProcess* cp);
static int kwsysProcessSetupOutputPipeFile(int* p, const char* name); static int kwsysProcessSetupOutputPipeFile(int* p, const char* name);
static int kwsysProcessGetTimeoutTime(kwsysProcess* cp, double* userTimeout, static int kwsysProcessGetTimeoutTime(kwsysProcess* cp, double* userTimeout,
kwsysProcessTime* timeoutTime); kwsysProcessTime* timeoutTime);
@ -117,6 +118,10 @@ static void kwsysProcessRestoreDefaultSignalHandlers(void);
static pid_t kwsysProcessFork(kwsysProcess* cp, static pid_t kwsysProcessFork(kwsysProcess* cp,
kwsysProcessCreateInformation* si); kwsysProcessCreateInformation* si);
static void kwsysProcessKill(pid_t process_id); static void kwsysProcessKill(pid_t process_id);
static int kwsysProcessesAdd(kwsysProcess* cp);
static void kwsysProcessesRemove(kwsysProcess* cp);
static void kwsysProcessesSignalHandler(int signum, siginfo_t* info,
void* ucontext);
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
/* Structure containing data used to implement the child's execution. */ /* Structure containing data used to implement the child's execution. */
@ -126,9 +131,13 @@ struct kwsysProcess_s
char*** Commands; char*** Commands;
int NumberOfCommands; int NumberOfCommands;
/* Descriptors for the read ends of the child's output pipes. */ /* Descriptors for the read ends of the child's output pipes and
the signal pipe. */
int PipeReadEnds[KWSYSPE_PIPE_COUNT]; int PipeReadEnds[KWSYSPE_PIPE_COUNT];
/* Write descriptor for child termination signal pipe. */
int SignalPipe;
/* Buffer for pipe data. */ /* Buffer for pipe data. */
char PipeBuffer[KWSYSPE_PIPE_BUFFER_SIZE]; char PipeBuffer[KWSYSPE_PIPE_BUFFER_SIZE];
@ -159,15 +168,15 @@ struct kwsysProcess_s
/* Flag for whether the timeout expired. */ /* Flag for whether the timeout expired. */
int TimeoutExpired; int TimeoutExpired;
/* The old SIGCHLD handler. */
struct sigaction OldSigChldAction;
/* The number of pipes left open during execution. */ /* The number of pipes left open during execution. */
int PipesLeft; int PipesLeft;
/* File descriptor set for call to select. */ /* File descriptor set for call to select. */
fd_set PipeSet; fd_set PipeSet;
/* The number of children still executing. */
int CommandsLeft;
/* The current status of the child process. */ /* The current status of the child process. */
int State; int State;
@ -558,8 +567,7 @@ const char* kwsysProcess_GetExceptionString(kwsysProcess* cp)
void kwsysProcess_Execute(kwsysProcess* cp) void kwsysProcess_Execute(kwsysProcess* cp)
{ {
int i; int i;
struct sigaction newSigChldAction; kwsysProcessCreateInformation si = {-1, -1, -1, {-1, -1}};
kwsysProcessCreateInformation si = {-1, -1, -1, -1, {-1, -1}};
/* Do not execute a second copy simultaneously. */ /* Do not execute a second copy simultaneously. */
if(!cp || cp->State == kwsysProcess_State_Executing) if(!cp || cp->State == kwsysProcess_State_Executing)
@ -597,46 +605,42 @@ void kwsysProcess_Execute(kwsysProcess* cp)
} }
} }
/* We want no special handling of SIGCHLD. Repeat call until it is /* If not running a detached child, add this object to the global
not interrupted. */ set of process objects that wish to be notified when a child
memset(&newSigChldAction, 0, sizeof(struct sigaction)); exits. */
newSigChldAction.sa_handler = SIG_DFL; if(!cp->OptionDetach)
while((sigaction(SIGCHLD, &newSigChldAction, &cp->OldSigChldAction) < 0) &&
(errno == EINTR));
/* Setup the stderr and termination pipes to be shared by all processes. */
for(i=KWSYSPE_PIPE_STDERR; i < KWSYSPE_PIPE_COUNT; ++i)
{ {
/* Create the pipe. */ if(!kwsysProcessesAdd(cp))
int p[2];
if(pipe(p) < 0)
{ {
kwsysProcessCleanup(cp, 1); kwsysProcessCleanup(cp, 1);
return; return;
} }
/* Store the pipe. */
cp->PipeReadEnds[i] = p[0];
if(i == KWSYSPE_PIPE_STDERR)
{
si.StdErr = p[1];
}
else
{
si.TermPipe = p[1];
}
/* Set close-on-exec flag on the pipe's ends. */
if((fcntl(p[0], F_SETFD, FD_CLOEXEC) < 0) ||
(fcntl(p[1], F_SETFD, FD_CLOEXEC) < 0))
{
kwsysProcessCleanup(cp, 1);
kwsysProcessCleanupDescriptor(&si.StdErr);
kwsysProcessCleanupDescriptor(&si.TermPipe);
return;
}
} }
/* Setup the stderr pipe to be shared by all processes. */
{
/* Create the pipe. */
int p[2];
if(pipe(p) < 0)
{
kwsysProcessCleanup(cp, 1);
return;
}
/* Store the pipe. */
cp->PipeReadEnds[KWSYSPE_PIPE_STDERR] = p[0];
si.StdErr = p[1];
/* Set close-on-exec flag on the pipe's ends. */
if((fcntl(p[0], F_SETFD, FD_CLOEXEC) < 0) ||
(fcntl(p[1], F_SETFD, FD_CLOEXEC) < 0))
{
kwsysProcessCleanup(cp, 1);
kwsysProcessCleanupDescriptor(&si.StdErr);
return;
}
}
/* Replace the stderr pipe with a file if requested. In this case /* Replace the stderr pipe with a file if requested. In this case
the select call will report that stderr is closed immediately. */ the select call will report that stderr is closed immediately. */
if(cp->PipeFileSTDERR) if(cp->PipeFileSTDERR)
@ -645,7 +649,6 @@ void kwsysProcess_Execute(kwsysProcess* cp)
{ {
kwsysProcessCleanup(cp, 1); kwsysProcessCleanup(cp, 1);
kwsysProcessCleanupDescriptor(&si.StdErr); kwsysProcessCleanupDescriptor(&si.StdErr);
kwsysProcessCleanupDescriptor(&si.TermPipe);
return; return;
} }
} }
@ -688,7 +691,6 @@ void kwsysProcess_Execute(kwsysProcess* cp)
{ {
kwsysProcessCleanupDescriptor(&si.StdErr); kwsysProcessCleanupDescriptor(&si.StdErr);
} }
kwsysProcessCleanupDescriptor(&si.TermPipe);
kwsysProcessCleanupDescriptor(&si.ErrorPipe[0]); kwsysProcessCleanupDescriptor(&si.ErrorPipe[0]);
kwsysProcessCleanupDescriptor(&si.ErrorPipe[1]); kwsysProcessCleanupDescriptor(&si.ErrorPipe[1]);
return; return;
@ -703,7 +705,6 @@ void kwsysProcess_Execute(kwsysProcess* cp)
{ {
kwsysProcessCleanupDescriptor(&si.StdErr); kwsysProcessCleanupDescriptor(&si.StdErr);
} }
kwsysProcessCleanupDescriptor(&si.TermPipe);
/* Restore the working directory. */ /* Restore the working directory. */
if(cp->RealWorkingDirectory) if(cp->RealWorkingDirectory)
@ -823,9 +824,10 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, char** data, int* length,
if(n > 0) if(n > 0)
{ {
/* We have data on this pipe. */ /* We have data on this pipe. */
if(i == KWSYSPE_PIPE_TERM) if(i == KWSYSPE_PIPE_SIGNAL)
{ {
/* This is data on the special termination pipe. Ignore it. */ /* A child process has terminated. */
kwsysProcessDestroy(cp);
} }
else if(data && length) else if(data && length)
{ {
@ -970,7 +972,6 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, char** data, int* length,
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout) int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout)
{ {
int result = 0;
int status = 0; int status = 0;
int prPipe = 0; int prPipe = 0;
@ -989,26 +990,6 @@ int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout)
} }
} }
/* Wait for each child to terminate. The process should have
already exited because KWSYSPE_PIPE_TERM has been closed by this
point. Repeat the call until it is not interrupted. */
if(!cp->Detached)
{
int i;
for(i=0; i < cp->NumberOfCommands; ++i)
{
while(((result = waitpid(cp->ForkPIDs[i],
&cp->CommandExitCodes[i], 0)) < 0) &&
(errno == EINTR));
if(result <= 0 && cp->State != kwsysProcess_State_Error)
{
/* Unexpected error. Report the first time this happens. */
strncpy(cp->ErrorMessage, strerror(errno), KWSYSPE_PIPE_BUFFER_SIZE);
cp->State = kwsysProcess_State_Error;
}
}
}
/* Check if there was an error in one of the waitpid calls. */ /* Check if there was an error in one of the waitpid calls. */
if(cp->State == kwsysProcess_State_Error) if(cp->State == kwsysProcess_State_Error)
{ {
@ -1084,11 +1065,18 @@ void kwsysProcess_Kill(kwsysProcess* cp)
cp->Killed = 1; cp->Killed = 1;
for(i=0; i < cp->NumberOfCommands; ++i) for(i=0; i < cp->NumberOfCommands; ++i)
{ {
int status;
if(cp->ForkPIDs[i]) if(cp->ForkPIDs[i])
{ {
/* Kill the child. */
kwsysProcessKill(cp->ForkPIDs[i]); kwsysProcessKill(cp->ForkPIDs[i]);
/* Reap the child. Keep trying until the call is not
interrupted. */
while((waitpid(cp->ForkPIDs[i], &status, 0) < 0) && (errno == EINTR));
} }
} }
cp->CommandsLeft = 0;
/* Close all the pipe read ends. */ /* Close all the pipe read ends. */
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
@ -1107,6 +1095,7 @@ static int kwsysProcessInitialize(kwsysProcess* cp)
{ {
cp->PipeReadEnds[i] = -1; cp->PipeReadEnds[i] = -1;
} }
cp->SignalPipe = -1;
cp->SelectError = 0; cp->SelectError = 0;
cp->StartTime.tv_sec = -1; cp->StartTime.tv_sec = -1;
cp->StartTime.tv_usec = -1; cp->StartTime.tv_usec = -1;
@ -1114,6 +1103,7 @@ static int kwsysProcessInitialize(kwsysProcess* cp)
cp->TimeoutTime.tv_usec = -1; cp->TimeoutTime.tv_usec = -1;
cp->TimeoutExpired = 0; cp->TimeoutExpired = 0;
cp->PipesLeft = 0; cp->PipesLeft = 0;
cp->CommandsLeft = 0;
FD_ZERO(&cp->PipeSet); FD_ZERO(&cp->PipeSet);
cp->State = kwsysProcess_State_Starting; cp->State = kwsysProcess_State_Starting;
cp->Killed = 0; cp->Killed = 0;
@ -1194,6 +1184,7 @@ static void kwsysProcessCleanup(kwsysProcess* cp, int error)
{ {
/* Kill the child. */ /* Kill the child. */
kwsysProcessKill(cp->ForkPIDs[i]); kwsysProcessKill(cp->ForkPIDs[i]);
/* Reap the child. Keep trying until the call is not /* Reap the child. Keep trying until the call is not
interrupted. */ interrupted. */
while((waitpid(cp->ForkPIDs[i], &status, 0) < 0) && while((waitpid(cp->ForkPIDs[i], &status, 0) < 0) &&
@ -1209,9 +1200,13 @@ static void kwsysProcessCleanup(kwsysProcess* cp, int error)
} }
} }
/* Restore the SIGCHLD handler. */ /* If not creating a detached child, remove this object from the
while((sigaction(SIGCHLD, &cp->OldSigChldAction, 0) < 0) && global set of process objects that wish to be notified when a
(errno == EINTR)); child exits. */
if(!cp->OptionDetach)
{
kwsysProcessesRemove(cp);
}
/* Free memory. */ /* Free memory. */
if(cp->ForkPIDs) if(cp->ForkPIDs)
@ -1360,12 +1355,10 @@ static int kwsysProcessCreate(kwsysProcess* cp, int prIndex,
} }
/* Clear the close-on-exec flag for stdin, stdout, and stderr. /* Clear the close-on-exec flag for stdin, stdout, and stderr.
Also clear it for the termination pipe. All other pipe handles All other pipe handles will be closed when exec succeeds. */
will be closed when exec succeeds. */
fcntl(0, F_SETFD, 0); fcntl(0, F_SETFD, 0);
fcntl(1, F_SETFD, 0); fcntl(1, F_SETFD, 0);
fcntl(2, F_SETFD, 0); fcntl(2, F_SETFD, 0);
fcntl(si->TermPipe, F_SETFD, 0);
/* Restore all default signal handlers. */ /* Restore all default signal handlers. */
kwsysProcessRestoreDefaultSignalHandlers(); kwsysProcessRestoreDefaultSignalHandlers();
@ -1377,6 +1370,9 @@ static int kwsysProcessCreate(kwsysProcess* cp, int prIndex,
kwsysProcessChildErrorExit(si->ErrorPipe[1]); kwsysProcessChildErrorExit(si->ErrorPipe[1]);
} }
/* A child has been created. */
++cp->CommandsLeft;
/* We are done with the error reporting pipe write end. */ /* We are done with the error reporting pipe write end. */
kwsysProcessCleanupDescriptor(&si->ErrorPipe[1]); kwsysProcessCleanupDescriptor(&si->ErrorPipe[1]);
@ -1424,6 +1420,47 @@ static int kwsysProcessCreate(kwsysProcess* cp, int prIndex,
return 1; return 1;
} }
/*--------------------------------------------------------------------------*/
static void kwsysProcessDestroy(kwsysProcess* cp)
{
/* A child process has terminated. Reap it if it is one handled by
this object. */
int i;
for(i=0; i < cp->NumberOfCommands; ++i)
{
if(cp->ForkPIDs[i])
{
int result;
while(((result = waitpid(cp->ForkPIDs[i],
&cp->CommandExitCodes[i], WNOHANG)) < 0) &&
(errno == EINTR));
if(result > 0)
{
/* This child has termianted. */
cp->ForkPIDs[i] = 0;
if(--cp->CommandsLeft == 0)
{
/* All children have terminated. Close the signal pipe
write end so that no more notifications are sent to this
object. */
kwsysProcessCleanupDescriptor(&cp->SignalPipe);
/* TODO: Once the children have terminated, switch
WaitForData to use a non-blocking read to get the
rest of the data from the pipe. This is needed when
grandchildren keep the output pipes open. */
}
}
else if(result < 0 && cp->State != kwsysProcess_State_Error)
{
/* Unexpected error. Report the first time this happens. */
strncpy(cp->ErrorMessage, strerror(errno), KWSYSPE_PIPE_BUFFER_SIZE);
cp->State = kwsysProcess_State_Error;
}
}
}
}
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
static int kwsysProcessSetupOutputPipeFile(int* p, const char* name) static int kwsysProcessSetupOutputPipeFile(int* p, const char* name)
{ {
@ -2014,3 +2051,196 @@ static void kwsysProcessKill(pid_t process_id)
/* Kill the process. */ /* Kill the process. */
kill(process_id, SIGKILL); kill(process_id, SIGKILL);
} }
/*--------------------------------------------------------------------------*/
/* Global set of executing processes for use by the signal handler.
This global instance will be zero-initialized by the compiler. */
typedef struct kwsysProcessInstances_s
{
int Count;
int Size;
kwsysProcess** Processes;
} kwsysProcessInstances;
static kwsysProcessInstances kwsysProcesses;
/* The old SIGCHLD handler. */
static struct sigaction kwsysProcessesOldSigChldAction;
/*--------------------------------------------------------------------------*/
static void kwsysProcessesUpdate(kwsysProcessInstances* newProcesses)
{
/* Block SIGCHLD while we update the set of pipes to check.
TODO: sigprocmask is undefined for threaded apps. See
pthread_sigmask. */
sigset_t newset;
sigset_t oldset;
sigemptyset(&newset);
sigaddset(&newset, SIGCHLD);
sigprocmask(SIG_BLOCK, &newset, &oldset);
/* Store the new set in that seen by the signal handler. */
kwsysProcesses = *newProcesses;
/* Restore the signal mask to the previous setting. */
sigprocmask(SIG_SETMASK, &oldset, 0);
}
/*--------------------------------------------------------------------------*/
static int kwsysProcessesAdd(kwsysProcess* cp)
{
/* Create a pipe through which the signal handler can notify the
given process object that a child has exited. */
{
/* Create the pipe. */
int oldfl[2];
int p[2];
if(pipe(p) < 0)
{
return 0;
}
/* Store the pipes now to be sure they are cleaned up later. */
cp->PipeReadEnds[KWSYSPE_PIPE_SIGNAL] = p[0];
cp->SignalPipe = p[1];
/* Switch the pipe to non-blocking mode so that reading a byte can
be an atomic test-and-set. */
if((oldfl[0] = fcntl(p[0], F_GETFL) < 0) ||
(oldfl[1] = fcntl(p[1], F_GETFL) < 0) ||
(fcntl(p[0], F_SETFL, oldfl[0] | O_NONBLOCK) < 0) ||
(fcntl(p[1], F_SETFL, oldfl[1] | O_NONBLOCK) < 0))
{
return 0;
}
/* The children do not need this pipe. Set close-on-exec flag on
the pipe's ends. */
if((fcntl(p[0], F_SETFD, FD_CLOEXEC) < 0) ||
(fcntl(p[1], F_SETFD, FD_CLOEXEC) < 0))
{
return 0;
}
}
/* Attempt to add the given signal pipe to the signal handler set. */
{
/* Make sure there is enough space for the new signal pipe. */
kwsysProcessInstances oldProcesses = kwsysProcesses;
kwsysProcessInstances newProcesses = oldProcesses;
if(oldProcesses.Count == oldProcesses.Size)
{
/* Start with enough space for a small number of process instances
and double the size each time more is needed. */
newProcesses.Size = oldProcesses.Size? oldProcesses.Size*2 : 4;
/* Try allocating the new block of memory. */
if((newProcesses.Processes = ((kwsysProcess**)
malloc(newProcesses.Size*
sizeof(kwsysProcess*)))))
{
/* Copy the old pipe set to the new memory. */
if(oldProcesses.Count > 0)
{
memcpy(newProcesses.Processes, oldProcesses.Processes,
(oldProcesses.Count * sizeof(kwsysProcess*)));
}
}
else
{
/* Failed to allocate memory for the new signal pipe set. */
return 0;
}
}
/* Append the new signal pipe to the set. */
newProcesses.Processes[newProcesses.Count++] = cp;
/* Store the new set in that seen by the signal handler. */
kwsysProcessesUpdate(&newProcesses);
/* Free the original pipes if new ones were allocated. */
if(newProcesses.Processes != oldProcesses.Processes)
{
free(oldProcesses.Processes);
}
/* If this is the first process, enable the signal handler. */
if(newProcesses.Count == 1)
{
/* Install our handler for SIGCHLD. Repeat call until it is not
interrupted. */
struct sigaction newSigChldAction;
memset(&newSigChldAction, 0, sizeof(struct sigaction));
newSigChldAction.sa_sigaction = kwsysProcessesSignalHandler;
newSigChldAction.sa_flags = SA_NOCLDSTOP | SA_RESTART | SA_SIGINFO;
while((sigaction(SIGCHLD, &newSigChldAction,
&kwsysProcessesOldSigChldAction) < 0) &&
(errno == EINTR));
}
}
return 1;
}
/*--------------------------------------------------------------------------*/
static void kwsysProcessesRemove(kwsysProcess* cp)
{
/* Attempt to remove the given signal pipe from the signal handler set. */
{
/* Find the given process in the set. */
kwsysProcessInstances newProcesses = kwsysProcesses;
int i;
for(i=0; i < newProcesses.Count; ++i)
{
if(newProcesses.Processes[i] == cp)
{
break;
}
}
if(i < newProcesses.Count)
{
/* Remove the process from the set. */
--newProcesses.Count;
for(; i < newProcesses.Count; ++i)
{
newProcesses.Processes[i] = newProcesses.Processes[i+1];
}
/* Store the new set in that seen by the signal handler. */
kwsysProcessesUpdate(&newProcesses);
/* If this was the last process, disable the signal handler. */
if(newProcesses.Count == 0)
{
/* Restore the SIGCHLD handler. Repeat call until it is not
interrupted. */
while((sigaction(SIGCHLD, &kwsysProcessesOldSigChldAction, 0) < 0) &&
(errno == EINTR));
}
}
}
/* Close the pipe through which the signal handler may have notified
the given process object that a child has exited. */
kwsysProcessCleanupDescriptor(&cp->SignalPipe);
}
/*--------------------------------------------------------------------------*/
static void kwsysProcessesSignalHandler(int signum, siginfo_t* info,
void* ucontext)
{
/* Signal all process objects that a child has terminated. */
int i;
(void)signum;
(void)info;
(void)ucontext;
for(i=0; i < kwsysProcesses.Count; ++i)
{
/* Set the pipe in a signalled state. */
char buf = 1;
kwsysProcess* cp = kwsysProcesses.Processes[i];
read(cp->PipeReadEnds[KWSYSPE_PIPE_SIGNAL], &buf, 1);
write(cp->SignalPipe, &buf, 1);
}
}