ENH: Added KWSYSPE_DEBUG macro to print debugging trace information. Added TODO comment explaining why process execution can still hang when a grandchild keeps the output pipes open.
This commit is contained in:
parent
0a0e89ee8a
commit
688ebad20e
|
@ -77,6 +77,22 @@ Q190351 and Q150956.
|
||||||
/* The maximum amount to read from a pipe at a time. */
|
/* The maximum amount to read from a pipe at a time. */
|
||||||
#define KWSYSPE_PIPE_BUFFER_SIZE 1024
|
#define KWSYSPE_PIPE_BUFFER_SIZE 1024
|
||||||
|
|
||||||
|
/* Debug output macro. */
|
||||||
|
#if 0
|
||||||
|
# define KWSYSPE_DEBUG(x) \
|
||||||
|
( \
|
||||||
|
(void*)cp == (void*)0x00226DE0? \
|
||||||
|
( \
|
||||||
|
fprintf(stderr, "%d/%p/%d ", (int)GetCurrentProcessId(), cp, __LINE__), \
|
||||||
|
fprintf x, \
|
||||||
|
fflush(stderr), \
|
||||||
|
1 \
|
||||||
|
) : (1) \
|
||||||
|
)
|
||||||
|
#else
|
||||||
|
# define KWSYSPE_DEBUG(x) (void)1
|
||||||
|
#endif
|
||||||
|
|
||||||
#define kwsysEncodedWriteArrayProcessFwd9x kwsys_ns(EncodedWriteArrayProcessFwd9x)
|
#define kwsysEncodedWriteArrayProcessFwd9x kwsys_ns(EncodedWriteArrayProcessFwd9x)
|
||||||
|
|
||||||
typedef LARGE_INTEGER kwsysProcessTime;
|
typedef LARGE_INTEGER kwsysProcessTime;
|
||||||
|
@ -1238,6 +1254,7 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, char** data, int* length,
|
||||||
done with the data. */
|
done with the data. */
|
||||||
if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT)
|
if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT)
|
||||||
{
|
{
|
||||||
|
KWSYSPE_DEBUG((stderr, "releasing reader %d\n", cp->CurrentIndex));
|
||||||
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0);
|
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0);
|
||||||
cp->CurrentIndex = KWSYSPE_PIPE_COUNT;
|
cp->CurrentIndex = KWSYSPE_PIPE_COUNT;
|
||||||
}
|
}
|
||||||
|
@ -1282,6 +1299,7 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, char** data, int* length,
|
||||||
inform the wakeup thread it is done with this process. */
|
inform the wakeup thread it is done with this process. */
|
||||||
kwsysProcessCleanupHandle(&cp->Pipe[cp->CurrentIndex].Read);
|
kwsysProcessCleanupHandle(&cp->Pipe[cp->CurrentIndex].Read);
|
||||||
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Waker.Go, 1, 0);
|
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Waker.Go, 1, 0);
|
||||||
|
KWSYSPE_DEBUG((stderr, "wakeup %d\n", cp->CurrentIndex));
|
||||||
--cp->PipesLeft;
|
--cp->PipesLeft;
|
||||||
}
|
}
|
||||||
else if(data && length)
|
else if(data && length)
|
||||||
|
@ -1337,6 +1355,7 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, char** data, int* length,
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* The process timeout has expired. Kill the child now. */
|
/* The process timeout has expired. Kill the child now. */
|
||||||
|
KWSYSPE_DEBUG((stderr, "killing child because timeout expired\n"));
|
||||||
kwsysProcess_Kill(cp);
|
kwsysProcess_Kill(cp);
|
||||||
cp->TimeoutExpired = 1;
|
cp->TimeoutExpired = 1;
|
||||||
cp->Killed = 0;
|
cp->Killed = 0;
|
||||||
|
@ -1372,10 +1391,13 @@ int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
KWSYSPE_DEBUG((stderr, "no more data\n"));
|
||||||
|
|
||||||
/* When the last pipe closes in WaitForData, the loop terminates
|
/* When the last pipe closes in WaitForData, the loop terminates
|
||||||
without releasing the pipe's thread. Release it now. */
|
without releasing the pipe's thread. Release it now. */
|
||||||
if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT)
|
if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT)
|
||||||
{
|
{
|
||||||
|
KWSYSPE_DEBUG((stderr, "releasing reader %d\n", cp->CurrentIndex));
|
||||||
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0);
|
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0);
|
||||||
cp->CurrentIndex = KWSYSPE_PIPE_COUNT;
|
cp->CurrentIndex = KWSYSPE_PIPE_COUNT;
|
||||||
}
|
}
|
||||||
|
@ -1383,7 +1405,9 @@ int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout)
|
||||||
/* Wait for all pipe threads to reset. */
|
/* Wait for all pipe threads to reset. */
|
||||||
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
|
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
|
||||||
{
|
{
|
||||||
|
KWSYSPE_DEBUG((stderr, "waiting reader reset %d\n", i));
|
||||||
WaitForSingleObject(cp->Pipe[i].Reader.Reset, INFINITE);
|
WaitForSingleObject(cp->Pipe[i].Reader.Reset, INFINITE);
|
||||||
|
KWSYSPE_DEBUG((stderr, "waiting waker reset %d\n", i));
|
||||||
WaitForSingleObject(cp->Pipe[i].Waker.Reset, INFINITE);
|
WaitForSingleObject(cp->Pipe[i].Waker.Reset, INFINITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1434,15 +1458,18 @@ void kwsysProcess_Kill(kwsysProcess* cp)
|
||||||
if(!cp || cp->State != kwsysProcess_State_Executing || cp->TimeoutExpired ||
|
if(!cp || cp->State != kwsysProcess_State_Executing || cp->TimeoutExpired ||
|
||||||
cp->Killed)
|
cp->Killed)
|
||||||
{
|
{
|
||||||
|
KWSYSPE_DEBUG((stderr, "kill: child not executing\n"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Disable the reading threads. */
|
/* Disable the reading threads. */
|
||||||
|
KWSYSPE_DEBUG((stderr, "kill: disabling pipe threads\n"));
|
||||||
kwsysProcessDisablePipeThreads(cp);
|
kwsysProcessDisablePipeThreads(cp);
|
||||||
|
|
||||||
/* Skip actually killing the child if it has already terminated. */
|
/* Skip actually killing the child if it has already terminated. */
|
||||||
if(cp->Terminated)
|
if(cp->Terminated)
|
||||||
{
|
{
|
||||||
|
KWSYSPE_DEBUG((stderr, "kill: child already terminated\n"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1498,8 +1525,11 @@ DWORD WINAPI kwsysProcessPipeThreadRead(LPVOID ptd)
|
||||||
void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td)
|
void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td)
|
||||||
{
|
{
|
||||||
/* Wait for space in the thread's buffer. */
|
/* Wait for space in the thread's buffer. */
|
||||||
while((WaitForSingleObject(td->Reader.Go, INFINITE), !td->Closed))
|
while((KWSYSPE_DEBUG((stderr, "wait for read %d\n", td->Index)),
|
||||||
|
WaitForSingleObject(td->Reader.Go, INFINITE), !td->Closed))
|
||||||
{
|
{
|
||||||
|
KWSYSPE_DEBUG((stderr, "reading %d\n", td->Index));
|
||||||
|
|
||||||
/* Read data from the pipe. This may block until data are available. */
|
/* Read data from the pipe. This may block until data are available. */
|
||||||
if(!ReadFile(td->Read, td->DataBuffer, KWSYSPE_PIPE_BUFFER_SIZE,
|
if(!ReadFile(td->Read, td->DataBuffer, KWSYSPE_PIPE_BUFFER_SIZE,
|
||||||
&td->DataLength, 0))
|
&td->DataLength, 0))
|
||||||
|
@ -1511,11 +1541,16 @@ void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td)
|
||||||
|
|
||||||
/* The pipe closed. There are no more data to read. */
|
/* The pipe closed. There are no more data to read. */
|
||||||
td->Closed = 1;
|
td->Closed = 1;
|
||||||
|
KWSYSPE_DEBUG((stderr, "read closed %d\n", td->Index));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
KWSYSPE_DEBUG((stderr, "read %d\n", td->Index));
|
||||||
|
|
||||||
/* Wait for our turn to be handled by the main thread. */
|
/* Wait for our turn to be handled by the main thread. */
|
||||||
WaitForSingleObject(cp->SharedIndexMutex, INFINITE);
|
WaitForSingleObject(cp->SharedIndexMutex, INFINITE);
|
||||||
|
|
||||||
|
KWSYSPE_DEBUG((stderr, "reporting read %d\n", td->Index));
|
||||||
|
|
||||||
/* Tell the main thread we have something to report. */
|
/* Tell the main thread we have something to report. */
|
||||||
cp->SharedIndex = td->Index;
|
cp->SharedIndex = td->Index;
|
||||||
ReleaseSemaphore(cp->Full, 1, 0);
|
ReleaseSemaphore(cp->Full, 1, 0);
|
||||||
|
@ -1523,6 +1558,7 @@ void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td)
|
||||||
|
|
||||||
/* We were signalled to exit with our buffer empty. Reset the
|
/* We were signalled to exit with our buffer empty. Reset the
|
||||||
mutex for a new process. */
|
mutex for a new process. */
|
||||||
|
KWSYSPE_DEBUG((stderr, "self releasing reader %d\n", td->Index));
|
||||||
ReleaseSemaphore(td->Reader.Go, 1, 0);
|
ReleaseSemaphore(td->Reader.Go, 1, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1560,13 +1596,17 @@ void kwsysProcessPipeThreadWakePipe(kwsysProcess* cp, kwsysProcessPipeData* td)
|
||||||
(void)cp;
|
(void)cp;
|
||||||
|
|
||||||
/* Wait for a possible wake command. */
|
/* Wait for a possible wake command. */
|
||||||
|
KWSYSPE_DEBUG((stderr, "wait for wake %d\n", td->Index));
|
||||||
WaitForSingleObject(td->Waker.Go, INFINITE);
|
WaitForSingleObject(td->Waker.Go, INFINITE);
|
||||||
|
KWSYSPE_DEBUG((stderr, "waking %d\n", td->Index));
|
||||||
|
|
||||||
/* If the pipe is not closed, we need to wake up the reading thread. */
|
/* If the pipe is not closed, we need to wake up the reading thread. */
|
||||||
if(!td->Closed)
|
if(!td->Closed)
|
||||||
{
|
{
|
||||||
DWORD dummy;
|
DWORD dummy;
|
||||||
|
KWSYSPE_DEBUG((stderr, "waker %d writing byte\n", td->Index));
|
||||||
WriteFile(td->Write, "", 1, &dummy, 0);
|
WriteFile(td->Write, "", 1, &dummy, 0);
|
||||||
|
KWSYSPE_DEBUG((stderr, "waker %d wrote byte\n", td->Index));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1910,6 +1950,15 @@ void kwsysProcessDestroy(kwsysProcess* cp, int event)
|
||||||
can detect end-of-data. */
|
can detect end-of-data. */
|
||||||
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
|
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
|
||||||
{
|
{
|
||||||
|
/* TODO: If the child created its own child (our grandchild)
|
||||||
|
which inherited a copy of the pipe write-end then the pipe
|
||||||
|
may not close and we will still need the waker write pipe.
|
||||||
|
However we still want to be able to detect end-of-data in the
|
||||||
|
normal case. The reader thread will have to switch to using
|
||||||
|
PeekNamedPipe to read the last bit of data from the pipe
|
||||||
|
without blocking. This is equivalent to using a non-blocking
|
||||||
|
read on posix. */
|
||||||
|
KWSYSPE_DEBUG((stderr, "closing wakeup write %d\n", i));
|
||||||
kwsysProcessCleanupHandle(&cp->Pipe[i].Write);
|
kwsysProcessCleanupHandle(&cp->Pipe[i].Write);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2842,6 +2891,7 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp)
|
||||||
/* If data were just reported data, release the pipe's thread. */
|
/* If data were just reported data, release the pipe's thread. */
|
||||||
if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT)
|
if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT)
|
||||||
{
|
{
|
||||||
|
KWSYSPE_DEBUG((stderr, "releasing reader %d\n", cp->CurrentIndex));
|
||||||
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0);
|
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0);
|
||||||
cp->CurrentIndex = KWSYSPE_PIPE_COUNT;
|
cp->CurrentIndex = KWSYSPE_PIPE_COUNT;
|
||||||
}
|
}
|
||||||
|
@ -2862,6 +2912,7 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp)
|
||||||
below. */
|
below. */
|
||||||
if(cp->Pipe[i].Read)
|
if(cp->Pipe[i].Read)
|
||||||
{
|
{
|
||||||
|
KWSYSPE_DEBUG((stderr, "releasing waker %d\n", i));
|
||||||
ReleaseSemaphore(cp->Pipe[i].Waker.Go, 1, 0);
|
ReleaseSemaphore(cp->Pipe[i].Waker.Go, 1, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2871,9 +2922,11 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp)
|
||||||
{
|
{
|
||||||
/* The waking threads will cause all reading threads to report.
|
/* The waking threads will cause all reading threads to report.
|
||||||
Wait for the next one and save its index. */
|
Wait for the next one and save its index. */
|
||||||
|
KWSYSPE_DEBUG((stderr, "waiting for reader\n"));
|
||||||
WaitForSingleObject(cp->Full, INFINITE);
|
WaitForSingleObject(cp->Full, INFINITE);
|
||||||
cp->CurrentIndex = cp->SharedIndex;
|
cp->CurrentIndex = cp->SharedIndex;
|
||||||
ReleaseSemaphore(cp->SharedIndexMutex, 1, 0);
|
ReleaseSemaphore(cp->SharedIndexMutex, 1, 0);
|
||||||
|
KWSYSPE_DEBUG((stderr, "got reader %d\n", cp->CurrentIndex));
|
||||||
|
|
||||||
/* We are done reading this pipe. Close its read handle. */
|
/* We are done reading this pipe. Close its read handle. */
|
||||||
cp->Pipe[cp->CurrentIndex].Closed = 1;
|
cp->Pipe[cp->CurrentIndex].Closed = 1;
|
||||||
|
|
Loading…
Reference in New Issue