ENH: Re-arranged handling of the two threads per pipe to improve readability of code.

This commit is contained in:
Brad King 2004-07-13 16:50:55 -04:00
parent a6c9cb9b0d
commit 43225860b6

@ -78,10 +78,12 @@ typedef struct kwsysProcessCreateInformation_s
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
typedef struct kwsysProcessPipeData_s kwsysProcessPipeData; typedef struct kwsysProcessPipeData_s kwsysProcessPipeData;
static DWORD WINAPI kwsysProcessPipeThread(LPVOID ptd); static DWORD WINAPI kwsysProcessPipeThreadRead(LPVOID ptd);
static void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td); static void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp,
kwsysProcessPipeData* td);
static DWORD WINAPI kwsysProcessPipeThreadWake(LPVOID ptd); static DWORD WINAPI kwsysProcessPipeThreadWake(LPVOID ptd);
static void kwsysProcessPipeThreadWakePipe(kwsysProcess* cp, kwsysProcessPipeData* td); static void kwsysProcessPipeThreadWakePipe(kwsysProcess* cp,
kwsysProcessPipeData* td);
static int kwsysProcessInitialize(kwsysProcess* cp); static int kwsysProcessInitialize(kwsysProcess* cp);
static int kwsysProcessCreate(kwsysProcess* cp, int index, static int kwsysProcessCreate(kwsysProcess* cp, int index,
kwsysProcessCreateInformation* si, kwsysProcessCreateInformation* si,
@ -108,30 +110,34 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp);
extern kwsysEXPORT int kwsysEncodedWriteArrayProcessFwd9x(const char* fname); extern kwsysEXPORT int kwsysEncodedWriteArrayProcessFwd9x(const char* fname);
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
/* A structure containing data for each pipe's thread. */ /* A structure containing synchronization data for each thread. */
typedef struct kwsysProcessPipeSync_s kwsysProcessPipeSync;
struct kwsysProcessPipeSync_s
{
/* Handle to the thread. */
HANDLE Thread;
/* Semaphore indicating to the thread that a process has started. */
HANDLE Ready;
/* Semaphore indicating to the thread that it should begin work. */
HANDLE Go;
/* Semaphore indicating thread has reset for another process. */
HANDLE Reset;
};
/*--------------------------------------------------------------------------*/
/* A structure containing data for each pipe's threads. */
struct kwsysProcessPipeData_s struct kwsysProcessPipeData_s
{ {
/* ------------- Data managed per instance of kwsysProcess ------------- */ /* ------------- Data managed per instance of kwsysProcess ------------- */
/* Handle for the thread for this pipe. */ /* Synchronization data for reading thread. */
HANDLE Thread; kwsysProcessPipeSync Reader;
HANDLE ThreadWake;
/* Semaphore indicating a process and pipe are available. */ /* Synchronization data for waking thread. */
HANDLE Ready; kwsysProcessPipeSync Waker;
/* Semaphore indicating when this thread's buffer is empty. */
HANDLE Empty;
/* Semaphore indicating a pipe thread has reset for another process. */
HANDLE Reset;
/* Semaphore indicating the wake thread should unblock. */
HANDLE Wake;
/* Semaphore indicating the wake thread has reset for another process. */
HANDLE ReadyWake;
HANDLE ResetWake;
/* Index of this pipe. */ /* Index of this pipe. */
int Index; int Index;
@ -428,53 +434,54 @@ kwsysProcess* kwsysProcess_New()
/* Give the thread a pointer back to the kwsysProcess instance. */ /* Give the thread a pointer back to the kwsysProcess instance. */
cp->Pipe[i].Process = cp; cp->Pipe[i].Process = cp;
/* The pipe is not yet ready to read. Initialize semaphore to 0. */ /* No process is yet running. Initialize semaphore to 0. */
if(!(cp->Pipe[i].Ready = CreateSemaphore(0, 0, 1, 0))) if(!(cp->Pipe[i].Reader.Ready = CreateSemaphore(0, 0, 1, 0)))
{ {
kwsysProcess_Delete(cp); kwsysProcess_Delete(cp);
return 0; return 0;
} }
/* The pipe is not yet reset. Initialize semaphore to 0. */ /* The pipe is not yet reset. Initialize semaphore to 0. */
if(!(cp->Pipe[i].Reset = CreateSemaphore(0, 0, 1, 0))) if(!(cp->Pipe[i].Reader.Reset = CreateSemaphore(0, 0, 1, 0)))
{ {
kwsysProcess_Delete(cp); kwsysProcess_Delete(cp);
return 0; return 0;
} }
/* The thread's buffer is initially empty. Initialize semaphore to 1. */ /* The thread's buffer is initially empty. Initialize semaphore to 1. */
if(!(cp->Pipe[i].Empty = CreateSemaphore(0, 1, 1, 0))) if(!(cp->Pipe[i].Reader.Go = CreateSemaphore(0, 1, 1, 0)))
{ {
kwsysProcess_Delete(cp); kwsysProcess_Delete(cp);
return 0; return 0;
} }
/* Create the thread. It will block immediately. The thread will /* Create the reading thread. It will block immediately. The
not make deeply nested calls, so we need only a small thread will not make deeply nested calls, so we need only a
stack. */ small stack. */
if(!(cp->Pipe[i].Thread = CreateThread(0, 1024, kwsysProcessPipeThread, if(!(cp->Pipe[i].Reader.Thread = CreateThread(0, 1024,
kwsysProcessPipeThreadRead,
&cp->Pipe[i], 0, &dummy))) &cp->Pipe[i], 0, &dummy)))
{ {
kwsysProcess_Delete(cp); kwsysProcess_Delete(cp);
return 0; return 0;
} }
/* The wake thread should block. Initialize semaphore to 0. */ /* No process is yet running. Initialize semaphore to 0. */
if(!(cp->Pipe[i].ReadyWake = CreateSemaphore(0, 0, 1, 0))) if(!(cp->Pipe[i].Waker.Ready = CreateSemaphore(0, 0, 1, 0)))
{ {
kwsysProcess_Delete(cp); kwsysProcess_Delete(cp);
return 0; return 0;
} }
/* The wake thread need not reset yet. Initialize semaphore to 0. */ /* The pipe is not yet reset. Initialize semaphore to 0. */
if(!(cp->Pipe[i].ResetWake = CreateSemaphore(0, 0, 1, 0))) if(!(cp->Pipe[i].Waker.Reset = CreateSemaphore(0, 0, 1, 0)))
{ {
kwsysProcess_Delete(cp); kwsysProcess_Delete(cp);
return 0; return 0;
} }
/* The pipe will not need to wake yet. Initialize semaphore to 0. */ /* The waker should not wake immediately. Initialize semaphore to 0. */
if(!(cp->Pipe[i].Wake = CreateSemaphore(0, 0, 1, 0))) if(!(cp->Pipe[i].Waker.Go = CreateSemaphore(0, 0, 1, 0)))
{ {
kwsysProcess_Delete(cp); kwsysProcess_Delete(cp);
return 0; return 0;
@ -483,7 +490,7 @@ kwsysProcess* kwsysProcess_New()
/* Create the waking thread. It will block immediately. The /* Create the waking thread. It will block immediately. The
thread will not make deeply nested calls, so we need only a thread will not make deeply nested calls, so we need only a
small stack. */ small stack. */
if(!(cp->Pipe[i].ThreadWake = CreateThread(0, 1024, if(!(cp->Pipe[i].Waker.Thread = CreateThread(0, 1024,
kwsysProcessPipeThreadWake, kwsysProcessPipeThreadWake,
&cp->Pipe[i], 0, &dummy))) &cp->Pipe[i], 0, &dummy)))
{ {
@ -525,39 +532,41 @@ void kwsysProcess_Delete(kwsysProcess* cp)
/* Terminate each of the threads. */ /* Terminate each of the threads. */
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{ {
if(cp->Pipe[i].Thread) /* Terminate this reading thread. */
if(cp->Pipe[i].Reader.Thread)
{ {
/* Signal the thread we are ready for it. It will terminate /* Signal the thread we are ready for it. It will terminate
immediately since Deleting is set. */ immediately since Deleting is set. */
ReleaseSemaphore(cp->Pipe[i].Ready, 1, 0); ReleaseSemaphore(cp->Pipe[i].Reader.Ready, 1, 0);
/* Wait for the thread to exit. */ /* Wait for the thread to exit. */
WaitForSingleObject(cp->Pipe[i].Thread, INFINITE); WaitForSingleObject(cp->Pipe[i].Reader.Thread, INFINITE);
/* Close the handle to the thread. */ /* Close the handle to the thread. */
kwsysProcessCleanupHandle(&cp->Pipe[i].Thread); kwsysProcessCleanupHandle(&cp->Pipe[i].Reader.Thread);
} }
if(cp->Pipe[i].ThreadWake) /* Terminate this waking thread. */
if(cp->Pipe[i].Waker.Thread)
{ {
/* Signal the thread we are ready for it. It will terminate /* Signal the thread we are ready for it. It will terminate
immediately since Deleting is set. */ immediately since Deleting is set. */
ReleaseSemaphore(cp->Pipe[i].ReadyWake, 1, 0); ReleaseSemaphore(cp->Pipe[i].Waker.Ready, 1, 0);
/* Wait for the thread to exit. */ /* Wait for the thread to exit. */
WaitForSingleObject(cp->Pipe[i].ThreadWake, INFINITE); WaitForSingleObject(cp->Pipe[i].Waker.Thread, INFINITE);
/* Close the handle to the thread. */ /* Close the handle to the thread. */
kwsysProcessCleanupHandle(&cp->Pipe[i].ThreadWake); kwsysProcessCleanupHandle(&cp->Pipe[i].Waker.Thread);
} }
/* Cleanup the pipe's semaphores. */ /* Cleanup the pipe's semaphores. */
kwsysProcessCleanupHandle(&cp->Pipe[i].Reset); kwsysProcessCleanupHandle(&cp->Pipe[i].Reader.Ready);
kwsysProcessCleanupHandle(&cp->Pipe[i].Ready); kwsysProcessCleanupHandle(&cp->Pipe[i].Reader.Go);
kwsysProcessCleanupHandle(&cp->Pipe[i].Empty); kwsysProcessCleanupHandle(&cp->Pipe[i].Reader.Reset);
kwsysProcessCleanupHandle(&cp->Pipe[i].ReadyWake); kwsysProcessCleanupHandle(&cp->Pipe[i].Waker.Ready);
kwsysProcessCleanupHandle(&cp->Pipe[i].ResetWake); kwsysProcessCleanupHandle(&cp->Pipe[i].Waker.Go);
kwsysProcessCleanupHandle(&cp->Pipe[i].Wake); kwsysProcessCleanupHandle(&cp->Pipe[i].Waker.Reset);
} }
/* Close the shared semaphores. */ /* Close the shared semaphores. */
@ -1194,8 +1203,8 @@ void kwsysProcess_Execute(kwsysProcess* cp)
/* Tell the pipe threads that a process has started. */ /* Tell the pipe threads that a process has started. */
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{ {
ReleaseSemaphore(cp->Pipe[i].Ready, 1, 0); ReleaseSemaphore(cp->Pipe[i].Reader.Ready, 1, 0);
ReleaseSemaphore(cp->Pipe[i].ReadyWake, 1, 0); ReleaseSemaphore(cp->Pipe[i].Waker.Ready, 1, 0);
} }
/* We don't care about the children's main threads. */ /* We don't care about the children's main threads. */
@ -1231,8 +1240,8 @@ void kwsysProcess_Disown(kwsysProcess* cp)
/* Wait for all pipe threads to reset. */ /* Wait for all pipe threads to reset. */
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{ {
WaitForSingleObject(cp->Pipe[i].Reset, INFINITE); WaitForSingleObject(cp->Pipe[i].Reader.Reset, INFINITE);
WaitForSingleObject(cp->Pipe[i].ResetWake, INFINITE); WaitForSingleObject(cp->Pipe[i].Waker.Reset, INFINITE);
} }
/* We will not wait for exit, so cleanup now. */ /* We will not wait for exit, so cleanup now. */
@ -1278,7 +1287,7 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, char** data, int* length,
done with the data. */ done with the data. */
if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT) if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT)
{ {
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Empty, 1, 0); ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0);
cp->CurrentIndex = KWSYSPE_PIPE_COUNT; cp->CurrentIndex = KWSYSPE_PIPE_COUNT;
} }
@ -1317,8 +1326,10 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, char** data, int* length,
/* Data are available or a pipe closed. */ /* Data are available or a pipe closed. */
if(cp->Pipe[cp->CurrentIndex].Closed) if(cp->Pipe[cp->CurrentIndex].Closed)
{ {
/* The pipe closed. */ /* The pipe closed at the write end. Close the read end and
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Wake, 1, 0); inform the wakeup thread it is done with this process. */
kwsysProcessCleanupHandle(&cp->Pipe[cp->CurrentIndex].Read);
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Waker.Go, 1, 0);
--cp->PipesLeft; --cp->PipesLeft;
} }
else if(data && length) else if(data && length)
@ -1413,15 +1424,15 @@ int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout)
without releaseing the pipe's thread. Release it now. */ without releaseing the pipe's thread. Release it now. */
if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT) if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT)
{ {
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Empty, 1, 0); ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0);
cp->CurrentIndex = KWSYSPE_PIPE_COUNT; cp->CurrentIndex = KWSYSPE_PIPE_COUNT;
} }
/* Wait for all pipe threads to reset. */ /* Wait for all pipe threads to reset. */
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{ {
WaitForSingleObject(cp->Pipe[i].Reset, INFINITE); WaitForSingleObject(cp->Pipe[i].Reader.Reset, INFINITE);
WaitForSingleObject(cp->Pipe[i].ResetWake, INFINITE); WaitForSingleObject(cp->Pipe[i].Waker.Reset, INFINITE);
} }
/* ---- It is now safe again to call kwsysProcessCleanup. ----- */ /* ---- It is now safe again to call kwsysProcessCleanup. ----- */
@ -1503,23 +1514,19 @@ void kwsysProcess_Kill(kwsysProcess* cp)
Function executed for each pipe's thread. Argument is a pointer to Function executed for each pipe's thread. Argument is a pointer to
the kwsysProcessPipeData instance for this thread. the kwsysProcessPipeData instance for this thread.
*/ */
DWORD WINAPI kwsysProcessPipeThread(LPVOID ptd) DWORD WINAPI kwsysProcessPipeThreadRead(LPVOID ptd)
{ {
kwsysProcessPipeData* td = (kwsysProcessPipeData*)ptd; kwsysProcessPipeData* td = (kwsysProcessPipeData*)ptd;
kwsysProcess* cp = td->Process; kwsysProcess* cp = td->Process;
/* Wait for a process to be ready. */ /* Wait for a process to be ready. */
while((WaitForSingleObject(td->Ready, INFINITE), !cp->Deleting)) while((WaitForSingleObject(td->Reader.Ready, INFINITE), !cp->Deleting))
{ {
/* Read output from the process for this thread's pipe. */ /* Read output from the process for this thread's pipe. */
kwsysProcessPipeThreadReadPipe(cp, td); kwsysProcessPipeThreadReadPipe(cp, td);
/* We were signalled to exit with our buffer empty. Reset the
mutex for a new process. */
ReleaseSemaphore(td->Empty, 1, 0);
/* Signal the main thread we have reset for a new process. */ /* Signal the main thread we have reset for a new process. */
ReleaseSemaphore(td->Reset, 1, 0); ReleaseSemaphore(td->Reader.Reset, 1, 0);
} }
return 0; return 0;
} }
@ -1533,7 +1540,7 @@ DWORD WINAPI kwsysProcessPipeThread(LPVOID ptd)
void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td) void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td)
{ {
/* Wait for space in the thread's buffer. */ /* Wait for space in the thread's buffer. */
while((WaitForSingleObject(td->Empty, INFINITE), !td->Closed)) while((WaitForSingleObject(td->Reader.Go, INFINITE), !td->Closed))
{ {
/* Read data from the pipe. This may block until data are available. */ /* Read data from the pipe. This may block until data are available. */
if(!ReadFile(td->Read, td->DataBuffer, KWSYSPE_PIPE_BUFFER_SIZE, if(!ReadFile(td->Read, td->DataBuffer, KWSYSPE_PIPE_BUFFER_SIZE,
@ -1555,6 +1562,10 @@ void kwsysProcessPipeThreadReadPipe(kwsysProcess* cp, kwsysProcessPipeData* td)
cp->SharedIndex = td->Index; cp->SharedIndex = td->Index;
ReleaseSemaphore(cp->Full, 1, 0); ReleaseSemaphore(cp->Full, 1, 0);
} }
/* We were signalled to exit with our buffer empty. Reset the
mutex for a new process. */
ReleaseSemaphore(td->Reader.Go, 1, 0);
} }
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
@ -1569,29 +1580,34 @@ DWORD WINAPI kwsysProcessPipeThreadWake(LPVOID ptd)
kwsysProcess* cp = td->Process; kwsysProcess* cp = td->Process;
/* Wait for a process to be ready. */ /* Wait for a process to be ready. */
while((WaitForSingleObject(td->ReadyWake, INFINITE), !cp->Deleting)) while((WaitForSingleObject(td->Waker.Ready, INFINITE), !cp->Deleting))
{ {
/* Wait for a possible wakeup. */ /* Wait for a possible wakeup. */
kwsysProcessPipeThreadWakePipe(cp, td); kwsysProcessPipeThreadWakePipe(cp, td);
/* Signal the main thread we have reset for a new process. */ /* Signal the main thread we have reset for a new process. */
ReleaseSemaphore(td->ResetWake, 1, 0); ReleaseSemaphore(td->Waker.Reset, 1, 0);
} }
return 0; return 0;
} }
/*--------------------------------------------------------------------------*/
/*
Function called in each pipe's thread to handle reading thread
wakeup for one execution of a subprocess.
*/
void kwsysProcessPipeThreadWakePipe(kwsysProcess* cp, kwsysProcessPipeData* td) void kwsysProcessPipeThreadWakePipe(kwsysProcess* cp, kwsysProcessPipeData* td)
{ {
/* Wait for a possible wake command. */ /* Wait for a possible wake command. */
if((WaitForSingleObject(td->Wake, INFINITE), !td->Closed)) WaitForSingleObject(td->Waker.Go, INFINITE);
/* If the pipe is not closed, we need to wake up the reading thread. */
if(!td->Closed)
{ {
/* The pipe is not closed. We need to wake up the reading thread. */
DWORD dummy; DWORD dummy;
WriteFile(td->Write, "", 1, &dummy, 0); WriteFile(td->Write, "", 1, &dummy, 0);
} }
/* We have processed the wake command. */
ReleaseSemaphore(td->Wake, 1, 0);
} }
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
@ -2666,11 +2682,11 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp)
/* If data were just reported data, release the pipe's thread. */ /* If data were just reported data, release the pipe's thread. */
if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT) if(cp->CurrentIndex < KWSYSPE_PIPE_COUNT)
{ {
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Empty, 1, 0); ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0);
cp->CurrentIndex = KWSYSPE_PIPE_COUNT; cp->CurrentIndex = KWSYSPE_PIPE_COUNT;
} }
/* Signal all the pipe wakeup threads. */ /* Wakeup all reading threads that are not on closed pipes. */
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i) for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{ {
/* The wakeup threads will write one byte to the pipe write ends. /* The wakeup threads will write one byte to the pipe write ends.
@ -2684,18 +2700,28 @@ static void kwsysProcessDisablePipeThreads(kwsysProcess* cp)
thread to call WriteFile. If it blocks, that is okay because thread to call WriteFile. If it blocks, that is okay because
it will unblock when we close the read end and break the pipe it will unblock when we close the read end and break the pipe
below. */ below. */
ReleaseSemaphore(cp->Pipe[i].Wake, 1, 0); if(cp->Pipe[i].Read)
{
ReleaseSemaphore(cp->Pipe[i].Waker.Go, 1, 0);
}
} }
/* Tell pipe threads to reset until we run another process. */ /* Tell pipe threads to reset until we run another process. */
while(cp->PipesLeft > 0) while(cp->PipesLeft > 0)
{ {
/* The waking threads will cause all reading threads to report.
Wait for the next one and save its index. */
WaitForSingleObject(cp->Full, INFINITE); WaitForSingleObject(cp->Full, INFINITE);
cp->CurrentIndex = cp->SharedIndex; cp->CurrentIndex = cp->SharedIndex;
ReleaseSemaphore(cp->SharedIndexMutex, 1, 0); ReleaseSemaphore(cp->SharedIndexMutex, 1, 0);
kwsysProcessCleanupHandle(&cp->Pipe[cp->CurrentIndex].Read);
/* We are done reading this pipe. Close its read handle. */
cp->Pipe[cp->CurrentIndex].Closed = 1; cp->Pipe[cp->CurrentIndex].Closed = 1;
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Empty, 1, 0); kwsysProcessCleanupHandle(&cp->Pipe[cp->CurrentIndex].Read);
--cp->PipesLeft; --cp->PipesLeft;
/* Tell the reading thread we are done with the data. It will
reset immediately because the pipe is closed. */
ReleaseSemaphore(cp->Pipe[cp->CurrentIndex].Reader.Go, 1, 0);
} }
} }