BUG: Fix for read pipe wakeup when child is writing alot of data and may fill the pipe buffer before WriteFile is called.

This commit is contained in:
Brad King 2004-07-13 16:23:49 -04:00
parent be990132da
commit a6c9cb9b0d
1 changed files with 114 additions and 3 deletions

View File

@ -80,6 +80,8 @@ typedef struct kwsysProcessCreateInformation_s
typedef struct kwsysProcessPipeData_s kwsysProcessPipeData; typedef struct kwsysProcessPipeData_s kwsysProcessPipeData;
static DWORD WINAPI kwsysProcessPipeThread(LPVOID ptd); static DWORD WINAPI kwsysProcessPipeThread(LPVOID ptd);
static void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td); static void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td);
static DWORD WINAPI kwsysProcessPipeThreadWake(LPVOID ptd);
static void kwsysProcessPipeThreadWakePipe(kwsysProcess* cp, kwsysProcessPipeData* td);
static int kwsysProcessInitialize(kwsysProcess* cp); static int kwsysProcessInitialize(kwsysProcess* cp);
static int kwsysProcessCreate(kwsysProcess* cp, int index, static int kwsysProcessCreate(kwsysProcess* cp, int index,
kwsysProcessCreateInformation* si, kwsysProcessCreateInformation* si,
@ -113,6 +115,7 @@ struct kwsysProcessPipeData_s
/* Handle for the thread for this pipe. */ /* Handle for the thread for this pipe. */
HANDLE Thread; HANDLE Thread;
HANDLE ThreadWake;
/* Semaphore indicating a process and pipe are available. */ /* Semaphore indicating a process and pipe are available. */
HANDLE Ready; HANDLE Ready;
@ -123,6 +126,13 @@ struct kwsysProcessPipeData_s
/* Semaphore indicating a pipe thread has reset for another process. */ /* Semaphore indicating a pipe thread has reset for another process. */
HANDLE Reset; HANDLE Reset;
/* Semaphore indicating the wake thread should unblock. */
HANDLE Wake;
/* Semaphore indicating the wake thread has reset for another process. */
HANDLE ReadyWake;
HANDLE ResetWake;
/* Index of this pipe. */ /* Index of this pipe. */
int Index; int Index;
@ -448,6 +458,38 @@ kwsysProcess* kwsysProcess_New()
kwsysProcess_Delete(cp); kwsysProcess_Delete(cp);
return 0; return 0;
} }
/* The wake thread should block. Initialize semaphore to 0. */
if(!(cp->Pipe[i].ReadyWake = CreateSemaphore(0, 0, 1, 0)))
{
kwsysProcess_Delete(cp);
return 0;
}
/* The wake thread need not reset yet. Initialize semaphore to 0. */
if(!(cp->Pipe[i].ResetWake = CreateSemaphore(0, 0, 1, 0)))
{
kwsysProcess_Delete(cp);
return 0;
}
/* The pipe will not need to wake yet. Initialize semaphore to 0. */
if(!(cp->Pipe[i].Wake = CreateSemaphore(0, 0, 1, 0)))
{
kwsysProcess_Delete(cp);
return 0;
}
/* Create the waking thread. It will block immediately. The
thread will not make deeply nested calls, so we need only a
small stack. */
if(!(cp->Pipe[i].ThreadWake = CreateThread(0, 1024,
kwsysProcessPipeThreadWake,
&cp->Pipe[i], 0, &dummy)))
{
kwsysProcess_Delete(cp);
return 0;
}
} }
return cp; return cp;
@ -496,9 +538,26 @@ void kwsysProcess_Delete(kwsysProcess* cp)
kwsysProcessCleanupHandle(&cp->Pipe[i].Thread); kwsysProcessCleanupHandle(&cp->Pipe[i].Thread);
} }
if(cp->Pipe[i].ThreadWake)
{
/* Signal the thread we are ready for it. It will terminate
immediately since Deleting is set. */
ReleaseSemaphore(cp->Pipe[i].ReadyWake, 1, 0);
/* Wait for the thread to exit. */
WaitForSingleObject(cp->Pipe[i].ThreadWake, INFINITE);
/* Close the handle to the thread. */
kwsysProcessCleanupHandle(&cp->Pipe[i].ThreadWake);
}
/* Cleanup the pipe's semaphores. */ /* Cleanup the pipe's semaphores. */
kwsysProcessCleanupHandle(&cp->Pipe[i].Reset);
kwsysProcessCleanupHandle(&cp->Pipe[i].Ready); kwsysProcessCleanupHandle(&cp->Pipe[i].Ready);
kwsysProcessCleanupHandle(&cp->Pipe[i].Empty); kwsysProcessCleanupHandle(&cp->Pipe[i].Empty);
kwsysProcessCleanupHandle(&cp->Pipe[i].ReadyWake);
kwsysProcessCleanupHandle(&cp->Pipe[i].ResetWake);
kwsysProcessCleanupHandle(&cp->Pipe[i].Wake);
} }
/* Close the shared semaphores. */ /* Close the shared semaphores. */
@ -1136,6 +1195,7 @@ void kwsysProcess_Execute(kwsysProcess* cp)
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{ {
ReleaseSemaphore(cp->Pipe[i].Ready, 1, 0); ReleaseSemaphore(cp->Pipe[i].Ready, 1, 0);
ReleaseSemaphore(cp->Pipe[i].ReadyWake, 1, 0);
} }
/* We don't care about the children's main threads. */ /* We don't care about the children's main threads. */
@ -1172,6 +1232,7 @@ void kwsysProcess_Disown(kwsysProcess* cp)
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{ {
WaitForSingleObject(cp->Pipe[i].Reset, INFINITE); WaitForSingleObject(cp->Pipe[i].Reset, INFINITE);
WaitForSingleObject(cp->Pipe[i].ResetWake, INFINITE);
} }
/* We will not wait for exit, so cleanup now. */ /* We will not wait for exit, so cleanup now. */
@ -1257,6 +1318,7 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, char** data, int* length,
if(cp->Pipe[cp->CurrentIndex].Closed) if(cp->Pipe[cp->CurrentIndex].Closed)
{ {
/* The pipe closed. */ /* The pipe closed. */
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Wake, 1, 0);
--cp->PipesLeft; --cp->PipesLeft;
} }
else if(data && length) else if(data && length)
@ -1359,6 +1421,7 @@ int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout)
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{ {
WaitForSingleObject(cp->Pipe[i].Reset, INFINITE); WaitForSingleObject(cp->Pipe[i].Reset, INFINITE);
WaitForSingleObject(cp->Pipe[i].ResetWake, INFINITE);
} }
/* ---- It is now safe again to call kwsysProcessCleanup. ----- */ /* ---- It is now safe again to call kwsysProcessCleanup. ----- */
@ -1494,6 +1557,43 @@ void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td)
} }
} }
/*--------------------------------------------------------------------------*/
/*
Function executed for each pipe's thread. Argument is a pointer to
the kwsysProcessPipeData instance for this thread.
*/
DWORD WINAPI kwsysProcessPipeThreadWake(LPVOID ptd)
{
kwsysProcessPipeData* td = (kwsysProcessPipeData*)ptd;
kwsysProcess* cp = td->Process;
/* Wait for a process to be ready. */
while((WaitForSingleObject(td->ReadyWake, INFINITE), !cp->Deleting))
{
/* Wait for a possible wakeup. */
kwsysProcessPipeThreadWakePipe(cp, td);
/* Signal the main thread we have reset for a new process. */
ReleaseSemaphore(td->ResetWake, 1, 0);
}
return 0;
}
void kwsysProcessPipeThreadWakePipe(kwsysProcess* cp, kwsysProcessPipeData* td)
{
/* Wait for a possible wake command. */
if((WaitForSingleObject(td->Wake, INFINITE), !td->Closed))
{
/* The pipe is not closed. We need to wake up the reading thread. */
DWORD dummy;
WriteFile(td->Write, "", 1, &dummy, 0);
}
/* We have processed the wake command. */
ReleaseSemaphore(td->Wake, 1, 0);
}
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
/* Initialize a process control structure for kwsysProcess_Execute. */ /* Initialize a process control structure for kwsysProcess_Execute. */
int kwsysProcessInitialize(kwsysProcess* cp) int kwsysProcessInitialize(kwsysProcess* cp)
@ -2570,11 +2670,21 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp)
cp->CurrentIndex = KWSYSPE_PIPE_COUNT; cp->CurrentIndex = KWSYSPE_PIPE_COUNT;
} }
/* Wake up all the pipe threads with dummy data. */ /* Signal all the pipe wakeup threads. */
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{ {
DWORD dummy; /* The wakeup threads will write one byte to the pipe write ends.
WriteFile(cp->Pipe[i].Write, "", 1, &dummy, 0); If there are no data in the pipe then this is enough to wakeup
the reading threads. If there are already data in the pipe
this may block. We cannot use PeekNamedPipe to check whether
there are data because an outside process might still be
writing data if we are disowning it. Also, PeekNamedPipe will
block if checking a pipe on which the reading thread is
currently calling ReadPipe. Therefore we need a separate
thread to call WriteFile. If it blocks, that is okay because
it will unblock when we close the read end and break the pipe
below. */
ReleaseSemaphore(cp->Pipe[i].Wake, 1, 0);
} }
/* Tell pipe threads to reset until we run another process. */ /* Tell pipe threads to reset until we run another process. */
@ -2583,6 +2693,7 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp)
WaitForSingleObject(cp->Full, INFINITE); WaitForSingleObject(cp->Full, INFINITE);
cp->CurrentIndex = cp->SharedIndex; cp->CurrentIndex = cp->SharedIndex;
ReleaseSemaphore(cp->SharedIndexMutex, 1, 0); ReleaseSemaphore(cp->SharedIndexMutex, 1, 0);
kwsysProcessCleanupHandle(&cp->Pipe[cp->CurrentIndex].Read);
cp->Pipe[cp->CurrentIndex].Closed = 1; cp->Pipe[cp->CurrentIndex].Closed = 1;
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Empty, 1, 0); ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Empty, 1, 0);
--cp->PipesLeft; --cp->PipesLeft;