2131 lines
61 KiB
C
2131 lines
61 KiB
C
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
* of this software and associated documentation files (the "Software"), to
|
|
* deal in the Software without restriction, including without limitation the
|
|
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
|
* sell copies of the Software, and to permit persons to whom the Software is
|
|
* furnished to do so, subject to the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be included in
|
|
* all copies or substantial portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
* IN THE SOFTWARE.
|
|
*/
|
|
|
|
#include <assert.h>
|
|
#include <io.h>
|
|
#include <string.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
|
|
#include "uv.h"
|
|
#include "internal.h"
|
|
#include "handle-inl.h"
|
|
#include "stream-inl.h"
|
|
#include "req-inl.h"
|
|
|
|
typedef struct uv__ipc_queue_item_s uv__ipc_queue_item_t;
|
|
|
|
struct uv__ipc_queue_item_s {
|
|
/*
|
|
* NOTE: It is important for socket_info_ex to be the first field,
|
|
* because we will we assigning it to the pending_ipc_info.socket_info
|
|
*/
|
|
uv__ipc_socket_info_ex socket_info_ex;
|
|
QUEUE member;
|
|
int tcp_connection;
|
|
};
|
|
|
|
/* A zero-size buffer for use by uv_pipe_read */
|
|
static char uv_zero_[] = "";
|
|
|
|
/* Null uv_buf_t */
|
|
static const uv_buf_t uv_null_buf_ = { 0, NULL };
|
|
|
|
/* The timeout that the pipe will wait for the remote end to write data */
|
|
/* when the local ends wants to shut it down. */
|
|
static const int64_t eof_timeout = 50; /* ms */
|
|
|
|
static const int default_pending_pipe_instances = 4;
|
|
|
|
/* Pipe prefix */
|
|
static char pipe_prefix[] = "\\\\?\\pipe";
|
|
static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
|
|
|
|
/* IPC protocol flags. */
|
|
#define UV_IPC_RAW_DATA 0x0001
|
|
#define UV_IPC_TCP_SERVER 0x0002
|
|
#define UV_IPC_TCP_CONNECTION 0x0004
|
|
|
|
/* IPC frame header. */
|
|
typedef struct {
|
|
int flags;
|
|
uint64_t raw_data_length;
|
|
} uv_ipc_frame_header_t;
|
|
|
|
/* IPC frame, which contains an imported TCP socket stream. */
|
|
typedef struct {
|
|
uv_ipc_frame_header_t header;
|
|
uv__ipc_socket_info_ex socket_info_ex;
|
|
} uv_ipc_frame_uv_stream;
|
|
|
|
static void eof_timer_init(uv_pipe_t* pipe);
|
|
static void eof_timer_start(uv_pipe_t* pipe);
|
|
static void eof_timer_stop(uv_pipe_t* pipe);
|
|
static void eof_timer_cb(uv_timer_t* timer);
|
|
static void eof_timer_destroy(uv_pipe_t* pipe);
|
|
static void eof_timer_close_cb(uv_handle_t* handle);
|
|
|
|
|
|
static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
|
|
snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
|
|
}
|
|
|
|
|
|
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
|
|
uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
|
|
|
|
handle->reqs_pending = 0;
|
|
handle->handle = INVALID_HANDLE_VALUE;
|
|
handle->name = NULL;
|
|
handle->pipe.conn.ipc_pid = 0;
|
|
handle->pipe.conn.remaining_ipc_rawdata_bytes = 0;
|
|
QUEUE_INIT(&handle->pipe.conn.pending_ipc_info.queue);
|
|
handle->pipe.conn.pending_ipc_info.queue_len = 0;
|
|
handle->ipc = ipc;
|
|
handle->pipe.conn.non_overlapped_writes_tail = NULL;
|
|
handle->pipe.conn.readfile_thread = NULL;
|
|
|
|
uv_req_init(loop, (uv_req_t*) &handle->pipe.conn.ipc_header_write_req);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static void uv_pipe_connection_init(uv_pipe_t* handle) {
|
|
uv_connection_init((uv_stream_t*) handle);
|
|
handle->read_req.data = handle;
|
|
handle->pipe.conn.eof_timer = NULL;
|
|
assert(!(handle->flags & UV_HANDLE_PIPESERVER));
|
|
if (pCancelSynchronousIo &&
|
|
handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
|
|
uv_mutex_init(&handle->pipe.conn.readfile_mutex);
|
|
handle->flags |= UV_HANDLE_PIPE_READ_CANCELABLE;
|
|
}
|
|
}
|
|
|
|
|
|
static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
|
|
HANDLE pipeHandle;
|
|
|
|
/*
|
|
* Assume that we have a duplex pipe first, so attempt to
|
|
* connect with GENERIC_READ | GENERIC_WRITE.
|
|
*/
|
|
pipeHandle = CreateFileW(name,
|
|
GENERIC_READ | GENERIC_WRITE,
|
|
0,
|
|
NULL,
|
|
OPEN_EXISTING,
|
|
FILE_FLAG_OVERLAPPED,
|
|
NULL);
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
*duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
|
|
return pipeHandle;
|
|
}
|
|
|
|
/*
|
|
* If the pipe is not duplex CreateFileW fails with
|
|
* ERROR_ACCESS_DENIED. In that case try to connect
|
|
* as a read-only or write-only.
|
|
*/
|
|
if (GetLastError() == ERROR_ACCESS_DENIED) {
|
|
pipeHandle = CreateFileW(name,
|
|
GENERIC_READ | FILE_WRITE_ATTRIBUTES,
|
|
0,
|
|
NULL,
|
|
OPEN_EXISTING,
|
|
FILE_FLAG_OVERLAPPED,
|
|
NULL);
|
|
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
*duplex_flags = UV_HANDLE_READABLE;
|
|
return pipeHandle;
|
|
}
|
|
}
|
|
|
|
if (GetLastError() == ERROR_ACCESS_DENIED) {
|
|
pipeHandle = CreateFileW(name,
|
|
GENERIC_WRITE | FILE_READ_ATTRIBUTES,
|
|
0,
|
|
NULL,
|
|
OPEN_EXISTING,
|
|
FILE_FLAG_OVERLAPPED,
|
|
NULL);
|
|
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
*duplex_flags = UV_HANDLE_WRITABLE;
|
|
return pipeHandle;
|
|
}
|
|
}
|
|
|
|
return INVALID_HANDLE_VALUE;
|
|
}
|
|
|
|
|
|
static void close_pipe(uv_pipe_t* pipe) {
|
|
assert(pipe->u.fd == -1 || pipe->u.fd > 2);
|
|
if (pipe->u.fd == -1)
|
|
CloseHandle(pipe->handle);
|
|
else
|
|
close(pipe->u.fd);
|
|
|
|
pipe->u.fd = -1;
|
|
pipe->handle = INVALID_HANDLE_VALUE;
|
|
}
|
|
|
|
|
|
int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
|
|
char* name, size_t nameSize) {
|
|
HANDLE pipeHandle;
|
|
int err;
|
|
char* ptr = (char*)handle;
|
|
|
|
for (;;) {
|
|
uv_unique_pipe_name(ptr, name, nameSize);
|
|
|
|
pipeHandle = CreateNamedPipeA(name,
|
|
access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
|
|
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
|
|
NULL);
|
|
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
/* No name collisions. We're done. */
|
|
break;
|
|
}
|
|
|
|
err = GetLastError();
|
|
if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
|
|
goto error;
|
|
}
|
|
|
|
/* Pipe name collision. Increment the pointer and try again. */
|
|
ptr++;
|
|
}
|
|
|
|
if (CreateIoCompletionPort(pipeHandle,
|
|
loop->iocp,
|
|
(ULONG_PTR)handle,
|
|
0) == NULL) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
uv_pipe_connection_init(handle);
|
|
handle->handle = pipeHandle;
|
|
|
|
return 0;
|
|
|
|
error:
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
CloseHandle(pipeHandle);
|
|
}
|
|
|
|
return err;
|
|
}
|
|
|
|
|
|
static int uv_set_pipe_handle(uv_loop_t* loop,
|
|
uv_pipe_t* handle,
|
|
HANDLE pipeHandle,
|
|
int fd,
|
|
DWORD duplex_flags) {
|
|
NTSTATUS nt_status;
|
|
IO_STATUS_BLOCK io_status;
|
|
FILE_MODE_INFORMATION mode_info;
|
|
DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
|
|
DWORD current_mode = 0;
|
|
DWORD err = 0;
|
|
|
|
if (!(handle->flags & UV_HANDLE_PIPESERVER) &&
|
|
handle->handle != INVALID_HANDLE_VALUE)
|
|
return UV_EBUSY;
|
|
|
|
if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
|
|
err = GetLastError();
|
|
if (err == ERROR_ACCESS_DENIED) {
|
|
/*
|
|
* SetNamedPipeHandleState can fail if the handle doesn't have either
|
|
* GENERIC_WRITE or FILE_WRITE_ATTRIBUTES.
|
|
* But if the handle already has the desired wait and blocking modes
|
|
* we can continue.
|
|
*/
|
|
if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL,
|
|
NULL, NULL, 0)) {
|
|
return -1;
|
|
} else if (current_mode & PIPE_NOWAIT) {
|
|
SetLastError(ERROR_ACCESS_DENIED);
|
|
return -1;
|
|
}
|
|
} else {
|
|
/* If this returns ERROR_INVALID_PARAMETER we probably opened
|
|
* something that is not a pipe. */
|
|
if (err == ERROR_INVALID_PARAMETER) {
|
|
SetLastError(WSAENOTSOCK);
|
|
}
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
/* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
|
|
nt_status = pNtQueryInformationFile(pipeHandle,
|
|
&io_status,
|
|
&mode_info,
|
|
sizeof(mode_info),
|
|
FileModeInformation);
|
|
if (nt_status != STATUS_SUCCESS) {
|
|
return -1;
|
|
}
|
|
|
|
if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
|
|
mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
|
|
/* Non-overlapped pipe. */
|
|
handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
|
|
} else {
|
|
/* Overlapped pipe. Try to associate with IOCP. */
|
|
if (CreateIoCompletionPort(pipeHandle,
|
|
loop->iocp,
|
|
(ULONG_PTR)handle,
|
|
0) == NULL) {
|
|
handle->flags |= UV_HANDLE_EMULATE_IOCP;
|
|
}
|
|
}
|
|
|
|
handle->handle = pipeHandle;
|
|
handle->u.fd = fd;
|
|
handle->flags |= duplex_flags;
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
|
|
uv_loop_t* loop;
|
|
uv_pipe_t* handle;
|
|
uv_shutdown_t* req;
|
|
|
|
req = (uv_shutdown_t*) parameter;
|
|
assert(req);
|
|
handle = (uv_pipe_t*) req->handle;
|
|
assert(handle);
|
|
loop = handle->loop;
|
|
assert(loop);
|
|
|
|
FlushFileBuffers(handle->handle);
|
|
|
|
/* Post completed */
|
|
POST_COMPLETION_FOR_REQ(loop, req);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
|
|
int err;
|
|
DWORD result;
|
|
uv_shutdown_t* req;
|
|
NTSTATUS nt_status;
|
|
IO_STATUS_BLOCK io_status;
|
|
FILE_PIPE_LOCAL_INFORMATION pipe_info;
|
|
uv__ipc_queue_item_t* item;
|
|
|
|
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
|
|
handle->flags &= ~UV_HANDLE_PIPE_READ_CANCELABLE;
|
|
uv_mutex_destroy(&handle->pipe.conn.readfile_mutex);
|
|
}
|
|
|
|
if ((handle->flags & UV_HANDLE_CONNECTION) &&
|
|
handle->stream.conn.shutdown_req != NULL &&
|
|
handle->stream.conn.write_reqs_pending == 0) {
|
|
req = handle->stream.conn.shutdown_req;
|
|
|
|
/* Clear the shutdown_req field so we don't go here again. */
|
|
handle->stream.conn.shutdown_req = NULL;
|
|
|
|
if (handle->flags & UV__HANDLE_CLOSING) {
|
|
UNREGISTER_HANDLE_REQ(loop, handle, req);
|
|
|
|
/* Already closing. Cancel the shutdown. */
|
|
if (req->cb) {
|
|
req->cb(req, UV_ECANCELED);
|
|
}
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
return;
|
|
}
|
|
|
|
/* Try to avoid flushing the pipe buffer in the thread pool. */
|
|
nt_status = pNtQueryInformationFile(handle->handle,
|
|
&io_status,
|
|
&pipe_info,
|
|
sizeof pipe_info,
|
|
FilePipeLocalInformation);
|
|
|
|
if (nt_status != STATUS_SUCCESS) {
|
|
/* Failure */
|
|
UNREGISTER_HANDLE_REQ(loop, handle, req);
|
|
|
|
handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
|
|
if (req->cb) {
|
|
err = pRtlNtStatusToDosError(nt_status);
|
|
req->cb(req, uv_translate_sys_error(err));
|
|
}
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
return;
|
|
}
|
|
|
|
if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
|
|
/* Short-circuit, no need to call FlushFileBuffers. */
|
|
uv_insert_pending_req(loop, (uv_req_t*) req);
|
|
return;
|
|
}
|
|
|
|
/* Run FlushFileBuffers in the thread pool. */
|
|
result = QueueUserWorkItem(pipe_shutdown_thread_proc,
|
|
req,
|
|
WT_EXECUTELONGFUNCTION);
|
|
if (result) {
|
|
return;
|
|
|
|
} else {
|
|
/* Failure. */
|
|
UNREGISTER_HANDLE_REQ(loop, handle, req);
|
|
|
|
handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
|
|
if (req->cb) {
|
|
err = GetLastError();
|
|
req->cb(req, uv_translate_sys_error(err));
|
|
}
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (handle->flags & UV__HANDLE_CLOSING &&
|
|
handle->reqs_pending == 0) {
|
|
assert(!(handle->flags & UV_HANDLE_CLOSED));
|
|
|
|
if (handle->flags & UV_HANDLE_CONNECTION) {
|
|
/* Free pending sockets */
|
|
while (!QUEUE_EMPTY(&handle->pipe.conn.pending_ipc_info.queue)) {
|
|
QUEUE* q;
|
|
SOCKET socket;
|
|
|
|
q = QUEUE_HEAD(&handle->pipe.conn.pending_ipc_info.queue);
|
|
QUEUE_REMOVE(q);
|
|
item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
|
|
|
|
/* Materialize socket and close it */
|
|
socket = WSASocketW(FROM_PROTOCOL_INFO,
|
|
FROM_PROTOCOL_INFO,
|
|
FROM_PROTOCOL_INFO,
|
|
&item->socket_info_ex.socket_info,
|
|
0,
|
|
WSA_FLAG_OVERLAPPED);
|
|
uv__free(item);
|
|
|
|
if (socket != INVALID_SOCKET)
|
|
closesocket(socket);
|
|
}
|
|
handle->pipe.conn.pending_ipc_info.queue_len = 0;
|
|
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
|
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
|
|
UnregisterWait(handle->read_req.wait_handle);
|
|
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
|
|
}
|
|
if (handle->read_req.event_handle) {
|
|
CloseHandle(handle->read_req.event_handle);
|
|
handle->read_req.event_handle = NULL;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_PIPESERVER) {
|
|
assert(handle->pipe.serv.accept_reqs);
|
|
uv__free(handle->pipe.serv.accept_reqs);
|
|
handle->pipe.serv.accept_reqs = NULL;
|
|
}
|
|
|
|
uv__handle_close(handle);
|
|
}
|
|
}
|
|
|
|
|
|
void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
|
|
if (handle->flags & UV_HANDLE_BOUND)
|
|
return;
|
|
handle->pipe.serv.pending_instances = count;
|
|
handle->flags |= UV_HANDLE_PIPESERVER;
|
|
}
|
|
|
|
|
|
/* Creates a pipe server. */
|
|
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
|
|
uv_loop_t* loop = handle->loop;
|
|
int i, err, nameSize;
|
|
uv_pipe_accept_t* req;
|
|
|
|
if (handle->flags & UV_HANDLE_BOUND) {
|
|
return UV_EINVAL;
|
|
}
|
|
|
|
if (!name) {
|
|
return UV_EINVAL;
|
|
}
|
|
|
|
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
|
|
handle->pipe.serv.pending_instances = default_pending_pipe_instances;
|
|
}
|
|
|
|
handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
|
|
uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
|
|
if (!handle->pipe.serv.accept_reqs) {
|
|
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
|
}
|
|
|
|
for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
|
|
req = &handle->pipe.serv.accept_reqs[i];
|
|
uv_req_init(loop, (uv_req_t*) req);
|
|
req->type = UV_ACCEPT;
|
|
req->data = handle;
|
|
req->pipeHandle = INVALID_HANDLE_VALUE;
|
|
req->next_pending = NULL;
|
|
}
|
|
|
|
/* Convert name to UTF16. */
|
|
nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
|
|
handle->name = (WCHAR*)uv__malloc(nameSize);
|
|
if (!handle->name) {
|
|
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
|
}
|
|
|
|
if (!MultiByteToWideChar(CP_UTF8,
|
|
0,
|
|
name,
|
|
-1,
|
|
handle->name,
|
|
nameSize / sizeof(WCHAR))) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
/*
|
|
* Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
|
|
* If this fails then there's already a pipe server for the given pipe name.
|
|
*/
|
|
handle->pipe.serv.accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
|
|
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
|
|
FILE_FLAG_FIRST_PIPE_INSTANCE,
|
|
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
|
|
PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
|
|
|
|
if (handle->pipe.serv.accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
|
|
err = GetLastError();
|
|
if (err == ERROR_ACCESS_DENIED) {
|
|
err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */
|
|
} else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
|
|
err = WSAEACCES; /* Translates to UV_EACCES. */
|
|
}
|
|
goto error;
|
|
}
|
|
|
|
if (uv_set_pipe_handle(loop,
|
|
handle,
|
|
handle->pipe.serv.accept_reqs[0].pipeHandle,
|
|
-1,
|
|
0)) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
handle->pipe.serv.pending_accepts = NULL;
|
|
handle->flags |= UV_HANDLE_PIPESERVER;
|
|
handle->flags |= UV_HANDLE_BOUND;
|
|
|
|
return 0;
|
|
|
|
error:
|
|
if (handle->name) {
|
|
uv__free(handle->name);
|
|
handle->name = NULL;
|
|
}
|
|
|
|
if (handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
|
|
CloseHandle(handle->pipe.serv.accept_reqs[0].pipeHandle);
|
|
handle->pipe.serv.accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
|
|
}
|
|
|
|
return uv_translate_sys_error(err);
|
|
}
|
|
|
|
|
|
static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
|
|
uv_loop_t* loop;
|
|
uv_pipe_t* handle;
|
|
uv_connect_t* req;
|
|
HANDLE pipeHandle = INVALID_HANDLE_VALUE;
|
|
DWORD duplex_flags;
|
|
|
|
req = (uv_connect_t*) parameter;
|
|
assert(req);
|
|
handle = (uv_pipe_t*) req->handle;
|
|
assert(handle);
|
|
loop = handle->loop;
|
|
assert(loop);
|
|
|
|
/* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
|
|
/* We wait for the pipe to become available with WaitNamedPipe. */
|
|
while (WaitNamedPipeW(handle->name, 30000)) {
|
|
/* The pipe is now available, try to connect. */
|
|
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
break;
|
|
}
|
|
|
|
SwitchToThread();
|
|
}
|
|
|
|
if (pipeHandle != INVALID_HANDLE_VALUE &&
|
|
!uv_set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags)) {
|
|
SET_REQ_SUCCESS(req);
|
|
} else {
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
}
|
|
|
|
/* Post completed */
|
|
POST_COMPLETION_FOR_REQ(loop, req);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
|
|
const char* name, uv_connect_cb cb) {
|
|
uv_loop_t* loop = handle->loop;
|
|
int err, nameSize;
|
|
HANDLE pipeHandle = INVALID_HANDLE_VALUE;
|
|
DWORD duplex_flags;
|
|
|
|
uv_req_init(loop, (uv_req_t*) req);
|
|
req->type = UV_CONNECT;
|
|
req->handle = (uv_stream_t*) handle;
|
|
req->cb = cb;
|
|
|
|
/* Convert name to UTF16. */
|
|
nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
|
|
handle->name = (WCHAR*)uv__malloc(nameSize);
|
|
if (!handle->name) {
|
|
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
|
}
|
|
|
|
if (!MultiByteToWideChar(CP_UTF8,
|
|
0,
|
|
name,
|
|
-1,
|
|
handle->name,
|
|
nameSize / sizeof(WCHAR))) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
|
|
if (pipeHandle == INVALID_HANDLE_VALUE) {
|
|
if (GetLastError() == ERROR_PIPE_BUSY) {
|
|
/* Wait for the server to make a pipe instance available. */
|
|
if (!QueueUserWorkItem(&pipe_connect_thread_proc,
|
|
req,
|
|
WT_EXECUTELONGFUNCTION)) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
handle->reqs_pending++;
|
|
|
|
return;
|
|
}
|
|
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
assert(pipeHandle != INVALID_HANDLE_VALUE);
|
|
|
|
if (uv_set_pipe_handle(loop,
|
|
(uv_pipe_t*) req->handle,
|
|
pipeHandle,
|
|
-1,
|
|
duplex_flags)) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
SET_REQ_SUCCESS(req);
|
|
uv_insert_pending_req(loop, (uv_req_t*) req);
|
|
handle->reqs_pending++;
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
return;
|
|
|
|
error:
|
|
if (handle->name) {
|
|
uv__free(handle->name);
|
|
handle->name = NULL;
|
|
}
|
|
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
CloseHandle(pipeHandle);
|
|
}
|
|
|
|
/* Make this req pending reporting an error. */
|
|
SET_REQ_ERROR(req, err);
|
|
uv_insert_pending_req(loop, (uv_req_t*) req);
|
|
handle->reqs_pending++;
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
return;
|
|
}
|
|
|
|
|
|
void uv__pipe_pause_read(uv_pipe_t* handle) {
|
|
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
|
|
/* Pause the ReadFile task briefly, to work
|
|
around the Windows kernel bug that causes
|
|
any access to a NamedPipe to deadlock if
|
|
any process has called ReadFile */
|
|
HANDLE h;
|
|
uv_mutex_lock(&handle->pipe.conn.readfile_mutex);
|
|
h = handle->pipe.conn.readfile_thread;
|
|
while (h) {
|
|
/* spinlock: we expect this to finish quickly,
|
|
or we are probably about to deadlock anyways
|
|
(in the kernel), so it doesn't matter */
|
|
pCancelSynchronousIo(h);
|
|
SwitchToThread(); /* yield thread control briefly */
|
|
h = handle->pipe.conn.readfile_thread;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
void uv__pipe_unpause_read(uv_pipe_t* handle) {
|
|
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
|
|
uv_mutex_unlock(&handle->pipe.conn.readfile_mutex);
|
|
}
|
|
}
|
|
|
|
|
|
void uv__pipe_stop_read(uv_pipe_t* handle) {
|
|
handle->flags &= ~UV_HANDLE_READING;
|
|
uv__pipe_pause_read((uv_pipe_t*)handle);
|
|
uv__pipe_unpause_read((uv_pipe_t*)handle);
|
|
}
|
|
|
|
|
|
/* Cleans up uv_pipe_t (server or connection) and all resources associated */
|
|
/* with it. */
|
|
void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
|
|
int i;
|
|
HANDLE pipeHandle;
|
|
|
|
uv__pipe_stop_read(handle);
|
|
|
|
if (handle->name) {
|
|
uv__free(handle->name);
|
|
handle->name = NULL;
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_PIPESERVER) {
|
|
for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
|
|
pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
CloseHandle(pipeHandle);
|
|
handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
|
|
}
|
|
}
|
|
handle->handle = INVALID_HANDLE_VALUE;
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_CONNECTION) {
|
|
handle->flags &= ~UV_HANDLE_WRITABLE;
|
|
eof_timer_destroy(handle);
|
|
}
|
|
|
|
if ((handle->flags & UV_HANDLE_CONNECTION)
|
|
&& handle->handle != INVALID_HANDLE_VALUE)
|
|
close_pipe(handle);
|
|
}
|
|
|
|
|
|
void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
|
|
if (handle->flags & UV_HANDLE_READING) {
|
|
handle->flags &= ~UV_HANDLE_READING;
|
|
DECREASE_ACTIVE_COUNT(loop, handle);
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_LISTENING) {
|
|
handle->flags &= ~UV_HANDLE_LISTENING;
|
|
DECREASE_ACTIVE_COUNT(loop, handle);
|
|
}
|
|
|
|
uv_pipe_cleanup(loop, handle);
|
|
|
|
if (handle->reqs_pending == 0) {
|
|
uv_want_endgame(loop, (uv_handle_t*) handle);
|
|
}
|
|
|
|
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
|
|
uv__handle_closing(handle);
|
|
}
|
|
|
|
|
|
static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_pipe_accept_t* req, BOOL firstInstance) {
|
|
assert(handle->flags & UV_HANDLE_LISTENING);
|
|
|
|
if (!firstInstance) {
|
|
assert(req->pipeHandle == INVALID_HANDLE_VALUE);
|
|
|
|
req->pipeHandle = CreateNamedPipeW(handle->name,
|
|
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
|
|
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
|
|
PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
|
|
|
|
if (req->pipeHandle == INVALID_HANDLE_VALUE) {
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
uv_insert_pending_req(loop, (uv_req_t*) req);
|
|
handle->reqs_pending++;
|
|
return;
|
|
}
|
|
|
|
if (uv_set_pipe_handle(loop, handle, req->pipeHandle, -1, 0)) {
|
|
CloseHandle(req->pipeHandle);
|
|
req->pipeHandle = INVALID_HANDLE_VALUE;
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
uv_insert_pending_req(loop, (uv_req_t*) req);
|
|
handle->reqs_pending++;
|
|
return;
|
|
}
|
|
}
|
|
|
|
assert(req->pipeHandle != INVALID_HANDLE_VALUE);
|
|
|
|
/* Prepare the overlapped structure. */
|
|
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
|
|
|
|
if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
|
|
GetLastError() != ERROR_IO_PENDING) {
|
|
if (GetLastError() == ERROR_PIPE_CONNECTED) {
|
|
SET_REQ_SUCCESS(req);
|
|
} else {
|
|
CloseHandle(req->pipeHandle);
|
|
req->pipeHandle = INVALID_HANDLE_VALUE;
|
|
/* Make this req pending reporting an error. */
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
}
|
|
uv_insert_pending_req(loop, (uv_req_t*) req);
|
|
handle->reqs_pending++;
|
|
return;
|
|
}
|
|
|
|
handle->reqs_pending++;
|
|
}
|
|
|
|
|
|
int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
|
|
uv_loop_t* loop = server->loop;
|
|
uv_pipe_t* pipe_client;
|
|
uv_pipe_accept_t* req;
|
|
QUEUE* q;
|
|
uv__ipc_queue_item_t* item;
|
|
int err;
|
|
|
|
if (server->ipc) {
|
|
if (QUEUE_EMPTY(&server->pipe.conn.pending_ipc_info.queue)) {
|
|
/* No valid pending sockets. */
|
|
return WSAEWOULDBLOCK;
|
|
}
|
|
|
|
q = QUEUE_HEAD(&server->pipe.conn.pending_ipc_info.queue);
|
|
QUEUE_REMOVE(q);
|
|
server->pipe.conn.pending_ipc_info.queue_len--;
|
|
item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
|
|
|
|
err = uv_tcp_import((uv_tcp_t*)client,
|
|
&item->socket_info_ex,
|
|
item->tcp_connection);
|
|
if (err != 0)
|
|
return err;
|
|
|
|
uv__free(item);
|
|
|
|
} else {
|
|
pipe_client = (uv_pipe_t*)client;
|
|
|
|
/* Find a connection instance that has been connected, but not yet */
|
|
/* accepted. */
|
|
req = server->pipe.serv.pending_accepts;
|
|
|
|
if (!req) {
|
|
/* No valid connections found, so we error out. */
|
|
return WSAEWOULDBLOCK;
|
|
}
|
|
|
|
/* Initialize the client handle and copy the pipeHandle to the client */
|
|
uv_pipe_connection_init(pipe_client);
|
|
pipe_client->handle = req->pipeHandle;
|
|
pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
|
|
|
|
/* Prepare the req to pick up a new connection */
|
|
server->pipe.serv.pending_accepts = req->next_pending;
|
|
req->next_pending = NULL;
|
|
req->pipeHandle = INVALID_HANDLE_VALUE;
|
|
|
|
if (!(server->flags & UV__HANDLE_CLOSING)) {
|
|
uv_pipe_queue_accept(loop, server, req, FALSE);
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* Starts listening for connections for the given pipe. */
|
|
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
|
|
uv_loop_t* loop = handle->loop;
|
|
int i;
|
|
|
|
if (handle->flags & UV_HANDLE_LISTENING) {
|
|
handle->stream.serv.connection_cb = cb;
|
|
}
|
|
|
|
if (!(handle->flags & UV_HANDLE_BOUND)) {
|
|
return WSAEINVAL;
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_READING) {
|
|
return WSAEISCONN;
|
|
}
|
|
|
|
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
|
|
return ERROR_NOT_SUPPORTED;
|
|
}
|
|
|
|
handle->flags |= UV_HANDLE_LISTENING;
|
|
INCREASE_ACTIVE_COUNT(loop, handle);
|
|
handle->stream.serv.connection_cb = cb;
|
|
|
|
/* First pipe handle should have already been created in uv_pipe_bind */
|
|
assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
|
|
|
|
for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
|
|
uv_pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
|
|
int result;
|
|
DWORD bytes;
|
|
uv_read_t* req = (uv_read_t*) parameter;
|
|
uv_pipe_t* handle = (uv_pipe_t*) req->data;
|
|
uv_loop_t* loop = handle->loop;
|
|
HANDLE hThread = NULL;
|
|
DWORD err;
|
|
uv_mutex_t *m = &handle->pipe.conn.readfile_mutex;
|
|
|
|
assert(req != NULL);
|
|
assert(req->type == UV_READ);
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
|
|
uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */
|
|
if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
|
|
GetCurrentProcess(), &hThread,
|
|
0, TRUE, DUPLICATE_SAME_ACCESS)) {
|
|
handle->pipe.conn.readfile_thread = hThread;
|
|
} else {
|
|
hThread = NULL;
|
|
}
|
|
uv_mutex_unlock(m);
|
|
}
|
|
restart_readfile:
|
|
result = ReadFile(handle->handle,
|
|
&uv_zero_,
|
|
0,
|
|
&bytes,
|
|
NULL);
|
|
if (!result) {
|
|
err = GetLastError();
|
|
if (err == ERROR_OPERATION_ABORTED &&
|
|
handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
|
|
if (handle->flags & UV_HANDLE_READING) {
|
|
/* just a brief break to do something else */
|
|
handle->pipe.conn.readfile_thread = NULL;
|
|
/* resume after it is finished */
|
|
uv_mutex_lock(m);
|
|
handle->pipe.conn.readfile_thread = hThread;
|
|
uv_mutex_unlock(m);
|
|
goto restart_readfile;
|
|
} else {
|
|
result = 1; /* successfully stopped reading */
|
|
}
|
|
}
|
|
}
|
|
if (hThread) {
|
|
assert(hThread == handle->pipe.conn.readfile_thread);
|
|
/* mutex does not control clearing readfile_thread */
|
|
handle->pipe.conn.readfile_thread = NULL;
|
|
uv_mutex_lock(m);
|
|
/* only when we hold the mutex lock is it safe to
|
|
open or close the handle */
|
|
CloseHandle(hThread);
|
|
uv_mutex_unlock(m);
|
|
}
|
|
|
|
if (!result) {
|
|
SET_REQ_ERROR(req, err);
|
|
}
|
|
|
|
POST_COMPLETION_FOR_REQ(loop, req);
|
|
return 0;
|
|
}
|
|
|
|
|
|
static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
|
|
int result;
|
|
DWORD bytes;
|
|
uv_write_t* req = (uv_write_t*) parameter;
|
|
uv_pipe_t* handle = (uv_pipe_t*) req->handle;
|
|
uv_loop_t* loop = handle->loop;
|
|
|
|
assert(req != NULL);
|
|
assert(req->type == UV_WRITE);
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
assert(req->write_buffer.base);
|
|
|
|
result = WriteFile(handle->handle,
|
|
req->write_buffer.base,
|
|
req->write_buffer.len,
|
|
&bytes,
|
|
NULL);
|
|
|
|
if (!result) {
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
}
|
|
|
|
POST_COMPLETION_FOR_REQ(loop, req);
|
|
return 0;
|
|
}
|
|
|
|
|
|
static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
|
|
uv_read_t* req;
|
|
uv_tcp_t* handle;
|
|
|
|
req = (uv_read_t*) context;
|
|
assert(req != NULL);
|
|
handle = (uv_tcp_t*)req->data;
|
|
assert(handle != NULL);
|
|
assert(!timed_out);
|
|
|
|
if (!PostQueuedCompletionStatus(handle->loop->iocp,
|
|
req->u.io.overlapped.InternalHigh,
|
|
0,
|
|
&req->u.io.overlapped)) {
|
|
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
|
|
}
|
|
}
|
|
|
|
|
|
static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
|
|
uv_write_t* req;
|
|
uv_tcp_t* handle;
|
|
|
|
req = (uv_write_t*) context;
|
|
assert(req != NULL);
|
|
handle = (uv_tcp_t*)req->handle;
|
|
assert(handle != NULL);
|
|
assert(!timed_out);
|
|
|
|
if (!PostQueuedCompletionStatus(handle->loop->iocp,
|
|
req->u.io.overlapped.InternalHigh,
|
|
0,
|
|
&req->u.io.overlapped)) {
|
|
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
|
|
}
|
|
}
|
|
|
|
|
|
static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
|
|
uv_read_t* req;
|
|
int result;
|
|
|
|
assert(handle->flags & UV_HANDLE_READING);
|
|
assert(!(handle->flags & UV_HANDLE_READ_PENDING));
|
|
|
|
assert(handle->handle != INVALID_HANDLE_VALUE);
|
|
|
|
req = &handle->read_req;
|
|
|
|
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
|
|
if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
|
|
req,
|
|
WT_EXECUTELONGFUNCTION)) {
|
|
/* Make this req pending reporting an error. */
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
goto error;
|
|
}
|
|
} else {
|
|
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
|
req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
|
|
}
|
|
|
|
/* Do 0-read */
|
|
result = ReadFile(handle->handle,
|
|
&uv_zero_,
|
|
0,
|
|
NULL,
|
|
&req->u.io.overlapped);
|
|
|
|
if (!result && GetLastError() != ERROR_IO_PENDING) {
|
|
/* Make this req pending reporting an error. */
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
goto error;
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
|
if (!req->event_handle) {
|
|
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
|
|
if (!req->event_handle) {
|
|
uv_fatal_error(GetLastError(), "CreateEvent");
|
|
}
|
|
}
|
|
if (req->wait_handle == INVALID_HANDLE_VALUE) {
|
|
if (!RegisterWaitForSingleObject(&req->wait_handle,
|
|
req->u.io.overlapped.hEvent, post_completion_read_wait, (void*) req,
|
|
INFINITE, WT_EXECUTEINWAITTHREAD)) {
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
goto error;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Start the eof timer if there is one */
|
|
eof_timer_start(handle);
|
|
handle->flags |= UV_HANDLE_READ_PENDING;
|
|
handle->reqs_pending++;
|
|
return;
|
|
|
|
error:
|
|
uv_insert_pending_req(loop, (uv_req_t*)req);
|
|
handle->flags |= UV_HANDLE_READ_PENDING;
|
|
handle->reqs_pending++;
|
|
}
|
|
|
|
|
|
int uv_pipe_read_start(uv_pipe_t* handle,
|
|
uv_alloc_cb alloc_cb,
|
|
uv_read_cb read_cb) {
|
|
uv_loop_t* loop = handle->loop;
|
|
|
|
handle->flags |= UV_HANDLE_READING;
|
|
INCREASE_ACTIVE_COUNT(loop, handle);
|
|
handle->read_cb = read_cb;
|
|
handle->alloc_cb = alloc_cb;
|
|
|
|
/* If reading was stopped and then started again, there could still be a */
|
|
/* read request pending. */
|
|
if (!(handle->flags & UV_HANDLE_READ_PENDING))
|
|
uv_pipe_queue_read(loop, handle);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
|
|
uv_write_t* req) {
|
|
req->next_req = NULL;
|
|
if (handle->pipe.conn.non_overlapped_writes_tail) {
|
|
req->next_req =
|
|
handle->pipe.conn.non_overlapped_writes_tail->next_req;
|
|
handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
|
|
handle->pipe.conn.non_overlapped_writes_tail = req;
|
|
} else {
|
|
req->next_req = (uv_req_t*)req;
|
|
handle->pipe.conn.non_overlapped_writes_tail = req;
|
|
}
|
|
}
|
|
|
|
|
|
static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
|
|
uv_write_t* req;
|
|
|
|
if (handle->pipe.conn.non_overlapped_writes_tail) {
|
|
req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
|
|
|
|
if (req == handle->pipe.conn.non_overlapped_writes_tail) {
|
|
handle->pipe.conn.non_overlapped_writes_tail = NULL;
|
|
} else {
|
|
handle->pipe.conn.non_overlapped_writes_tail->next_req =
|
|
req->next_req;
|
|
}
|
|
|
|
return req;
|
|
} else {
|
|
/* queue empty */
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
|
|
static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
|
|
uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
|
|
if (req) {
|
|
if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
|
|
req,
|
|
WT_EXECUTELONGFUNCTION)) {
|
|
uv_fatal_error(GetLastError(), "QueueUserWorkItem");
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
static int uv_pipe_write_impl(uv_loop_t* loop,
|
|
uv_write_t* req,
|
|
uv_pipe_t* handle,
|
|
const uv_buf_t bufs[],
|
|
unsigned int nbufs,
|
|
uv_stream_t* send_handle,
|
|
uv_write_cb cb) {
|
|
int err;
|
|
int result;
|
|
uv_tcp_t* tcp_send_handle;
|
|
uv_write_t* ipc_header_req = NULL;
|
|
uv_ipc_frame_uv_stream ipc_frame;
|
|
|
|
if (nbufs != 1 && (nbufs != 0 || !send_handle)) {
|
|
return ERROR_NOT_SUPPORTED;
|
|
}
|
|
|
|
/* Only TCP handles are supported for sharing. */
|
|
if (send_handle && ((send_handle->type != UV_TCP) ||
|
|
(!(send_handle->flags & UV_HANDLE_BOUND) &&
|
|
!(send_handle->flags & UV_HANDLE_CONNECTION)))) {
|
|
return ERROR_NOT_SUPPORTED;
|
|
}
|
|
|
|
assert(handle->handle != INVALID_HANDLE_VALUE);
|
|
|
|
uv_req_init(loop, (uv_req_t*) req);
|
|
req->type = UV_WRITE;
|
|
req->handle = (uv_stream_t*) handle;
|
|
req->cb = cb;
|
|
req->ipc_header = 0;
|
|
req->event_handle = NULL;
|
|
req->wait_handle = INVALID_HANDLE_VALUE;
|
|
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
|
|
|
|
if (handle->ipc) {
|
|
assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
|
|
ipc_frame.header.flags = 0;
|
|
|
|
/* Use the IPC framing protocol. */
|
|
if (send_handle) {
|
|
tcp_send_handle = (uv_tcp_t*)send_handle;
|
|
|
|
if (handle->pipe.conn.ipc_pid == 0) {
|
|
handle->pipe.conn.ipc_pid = uv_current_pid();
|
|
}
|
|
|
|
err = uv_tcp_duplicate_socket(tcp_send_handle, handle->pipe.conn.ipc_pid,
|
|
&ipc_frame.socket_info_ex.socket_info);
|
|
if (err) {
|
|
return err;
|
|
}
|
|
|
|
ipc_frame.socket_info_ex.delayed_error = tcp_send_handle->delayed_error;
|
|
|
|
ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
|
|
|
|
if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
|
|
ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
|
|
}
|
|
}
|
|
|
|
if (nbufs == 1) {
|
|
ipc_frame.header.flags |= UV_IPC_RAW_DATA;
|
|
ipc_frame.header.raw_data_length = bufs[0].len;
|
|
}
|
|
|
|
/*
|
|
* Use the provided req if we're only doing a single write.
|
|
* If we're doing multiple writes, use ipc_header_write_req to do
|
|
* the first write, and then use the provided req for the second write.
|
|
*/
|
|
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
|
|
ipc_header_req = req;
|
|
} else {
|
|
/*
|
|
* Try to use the preallocated write req if it's available.
|
|
* Otherwise allocate a new one.
|
|
*/
|
|
if (handle->pipe.conn.ipc_header_write_req.type != UV_WRITE) {
|
|
ipc_header_req = (uv_write_t*)&handle->pipe.conn.ipc_header_write_req;
|
|
} else {
|
|
ipc_header_req = (uv_write_t*)uv__malloc(sizeof(uv_write_t));
|
|
if (!ipc_header_req) {
|
|
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
|
}
|
|
}
|
|
|
|
uv_req_init(loop, (uv_req_t*) ipc_header_req);
|
|
ipc_header_req->type = UV_WRITE;
|
|
ipc_header_req->handle = (uv_stream_t*) handle;
|
|
ipc_header_req->cb = NULL;
|
|
ipc_header_req->ipc_header = 1;
|
|
}
|
|
|
|
/* Write the header or the whole frame. */
|
|
memset(&ipc_header_req->u.io.overlapped, 0,
|
|
sizeof(ipc_header_req->u.io.overlapped));
|
|
|
|
/* Using overlapped IO, but wait for completion before returning.
|
|
This write is blocking because ipc_frame is on stack. */
|
|
ipc_header_req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
|
|
if (!ipc_header_req->u.io.overlapped.hEvent) {
|
|
uv_fatal_error(GetLastError(), "CreateEvent");
|
|
}
|
|
|
|
result = WriteFile(handle->handle,
|
|
&ipc_frame,
|
|
ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
|
|
sizeof(ipc_frame) : sizeof(ipc_frame.header),
|
|
NULL,
|
|
&ipc_header_req->u.io.overlapped);
|
|
if (!result && GetLastError() != ERROR_IO_PENDING) {
|
|
err = GetLastError();
|
|
CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
|
|
return err;
|
|
}
|
|
|
|
if (!result) {
|
|
/* Request not completed immediately. Wait for it.*/
|
|
if (WaitForSingleObject(ipc_header_req->u.io.overlapped.hEvent, INFINITE) !=
|
|
WAIT_OBJECT_0) {
|
|
err = GetLastError();
|
|
CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
|
|
return err;
|
|
}
|
|
}
|
|
ipc_header_req->u.io.queued_bytes = 0;
|
|
CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
|
|
ipc_header_req->u.io.overlapped.hEvent = NULL;
|
|
|
|
REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
|
|
handle->reqs_pending++;
|
|
handle->stream.conn.write_reqs_pending++;
|
|
|
|
/* If we don't have any raw data to write - we're done. */
|
|
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
if ((handle->flags &
|
|
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
|
|
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
|
|
DWORD bytes;
|
|
result = WriteFile(handle->handle,
|
|
bufs[0].base,
|
|
bufs[0].len,
|
|
&bytes,
|
|
NULL);
|
|
|
|
if (!result) {
|
|
err = GetLastError();
|
|
return err;
|
|
} else {
|
|
/* Request completed immediately. */
|
|
req->u.io.queued_bytes = 0;
|
|
}
|
|
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
handle->reqs_pending++;
|
|
handle->stream.conn.write_reqs_pending++;
|
|
POST_COMPLETION_FOR_REQ(loop, req);
|
|
return 0;
|
|
} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
|
|
req->write_buffer = bufs[0];
|
|
uv_insert_non_overlapped_write_req(handle, req);
|
|
if (handle->stream.conn.write_reqs_pending == 0) {
|
|
uv_queue_non_overlapped_write(handle);
|
|
}
|
|
|
|
/* Request queued by the kernel. */
|
|
req->u.io.queued_bytes = bufs[0].len;
|
|
handle->write_queue_size += req->u.io.queued_bytes;
|
|
} else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
|
|
/* Using overlapped IO, but wait for completion before returning */
|
|
req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
|
|
if (!req->u.io.overlapped.hEvent) {
|
|
uv_fatal_error(GetLastError(), "CreateEvent");
|
|
}
|
|
|
|
result = WriteFile(handle->handle,
|
|
bufs[0].base,
|
|
bufs[0].len,
|
|
NULL,
|
|
&req->u.io.overlapped);
|
|
|
|
if (!result && GetLastError() != ERROR_IO_PENDING) {
|
|
err = GetLastError();
|
|
CloseHandle(req->u.io.overlapped.hEvent);
|
|
return err;
|
|
}
|
|
|
|
if (result) {
|
|
/* Request completed immediately. */
|
|
req->u.io.queued_bytes = 0;
|
|
} else {
|
|
/* Request queued by the kernel. */
|
|
req->u.io.queued_bytes = bufs[0].len;
|
|
handle->write_queue_size += req->u.io.queued_bytes;
|
|
if (WaitForSingleObject(req->u.io.overlapped.hEvent, INFINITE) !=
|
|
WAIT_OBJECT_0) {
|
|
err = GetLastError();
|
|
CloseHandle(req->u.io.overlapped.hEvent);
|
|
return uv_translate_sys_error(err);
|
|
}
|
|
}
|
|
CloseHandle(req->u.io.overlapped.hEvent);
|
|
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
handle->reqs_pending++;
|
|
handle->stream.conn.write_reqs_pending++;
|
|
return 0;
|
|
} else {
|
|
result = WriteFile(handle->handle,
|
|
bufs[0].base,
|
|
bufs[0].len,
|
|
NULL,
|
|
&req->u.io.overlapped);
|
|
|
|
if (!result && GetLastError() != ERROR_IO_PENDING) {
|
|
return GetLastError();
|
|
}
|
|
|
|
if (result) {
|
|
/* Request completed immediately. */
|
|
req->u.io.queued_bytes = 0;
|
|
} else {
|
|
/* Request queued by the kernel. */
|
|
req->u.io.queued_bytes = bufs[0].len;
|
|
handle->write_queue_size += req->u.io.queued_bytes;
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
|
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
|
|
if (!req->event_handle) {
|
|
uv_fatal_error(GetLastError(), "CreateEvent");
|
|
}
|
|
if (!RegisterWaitForSingleObject(&req->wait_handle,
|
|
req->u.io.overlapped.hEvent, post_completion_write_wait, (void*) req,
|
|
INFINITE, WT_EXECUTEINWAITTHREAD)) {
|
|
return GetLastError();
|
|
}
|
|
}
|
|
}
|
|
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
handle->reqs_pending++;
|
|
handle->stream.conn.write_reqs_pending++;
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
int uv_pipe_write(uv_loop_t* loop,
|
|
uv_write_t* req,
|
|
uv_pipe_t* handle,
|
|
const uv_buf_t bufs[],
|
|
unsigned int nbufs,
|
|
uv_write_cb cb) {
|
|
return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb);
|
|
}
|
|
|
|
|
|
int uv_pipe_write2(uv_loop_t* loop,
|
|
uv_write_t* req,
|
|
uv_pipe_t* handle,
|
|
const uv_buf_t bufs[],
|
|
unsigned int nbufs,
|
|
uv_stream_t* send_handle,
|
|
uv_write_cb cb) {
|
|
if (!handle->ipc) {
|
|
return WSAEINVAL;
|
|
}
|
|
|
|
return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb);
|
|
}
|
|
|
|
|
|
static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_buf_t buf) {
|
|
/* If there is an eof timer running, we don't need it any more, */
|
|
/* so discard it. */
|
|
eof_timer_destroy(handle);
|
|
|
|
handle->flags &= ~UV_HANDLE_READABLE;
|
|
uv_read_stop((uv_stream_t*) handle);
|
|
|
|
handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
|
|
}
|
|
|
|
|
|
static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
|
|
uv_buf_t buf) {
|
|
/* If there is an eof timer running, we don't need it any more, */
|
|
/* so discard it. */
|
|
eof_timer_destroy(handle);
|
|
|
|
uv_read_stop((uv_stream_t*) handle);
|
|
|
|
handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
|
|
}
|
|
|
|
|
|
static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
|
|
int error, uv_buf_t buf) {
|
|
if (error == ERROR_BROKEN_PIPE) {
|
|
uv_pipe_read_eof(loop, handle, buf);
|
|
} else {
|
|
uv_pipe_read_error(loop, handle, error, buf);
|
|
}
|
|
}
|
|
|
|
|
|
void uv__pipe_insert_pending_socket(uv_pipe_t* handle,
|
|
uv__ipc_socket_info_ex* info,
|
|
int tcp_connection) {
|
|
uv__ipc_queue_item_t* item;
|
|
|
|
item = (uv__ipc_queue_item_t*) uv__malloc(sizeof(*item));
|
|
if (item == NULL)
|
|
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
|
|
|
memcpy(&item->socket_info_ex, info, sizeof(item->socket_info_ex));
|
|
item->tcp_connection = tcp_connection;
|
|
QUEUE_INSERT_TAIL(&handle->pipe.conn.pending_ipc_info.queue, &item->member);
|
|
handle->pipe.conn.pending_ipc_info.queue_len++;
|
|
}
|
|
|
|
|
|
void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_req_t* req) {
|
|
DWORD bytes, avail;
|
|
uv_buf_t buf;
|
|
uv_ipc_frame_uv_stream ipc_frame;
|
|
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
handle->flags &= ~UV_HANDLE_READ_PENDING;
|
|
eof_timer_stop(handle);
|
|
|
|
if (!REQ_SUCCESS(req)) {
|
|
/* An error occurred doing the 0-read. */
|
|
if (handle->flags & UV_HANDLE_READING) {
|
|
uv_pipe_read_error_or_eof(loop,
|
|
handle,
|
|
GET_REQ_ERROR(req),
|
|
uv_null_buf_);
|
|
}
|
|
} else {
|
|
/* Do non-blocking reads until the buffer is empty */
|
|
while (handle->flags & UV_HANDLE_READING) {
|
|
if (!PeekNamedPipe(handle->handle,
|
|
NULL,
|
|
0,
|
|
NULL,
|
|
&avail,
|
|
NULL)) {
|
|
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
|
|
break;
|
|
}
|
|
|
|
if (avail == 0) {
|
|
/* There is nothing to read after all. */
|
|
break;
|
|
}
|
|
|
|
if (handle->ipc) {
|
|
/* Use the IPC framing protocol to read the incoming data. */
|
|
if (handle->pipe.conn.remaining_ipc_rawdata_bytes == 0) {
|
|
/* We're reading a new frame. First, read the header. */
|
|
assert(avail >= sizeof(ipc_frame.header));
|
|
|
|
if (!ReadFile(handle->handle,
|
|
&ipc_frame.header,
|
|
sizeof(ipc_frame.header),
|
|
&bytes,
|
|
NULL)) {
|
|
uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
|
|
uv_null_buf_);
|
|
break;
|
|
}
|
|
|
|
assert(bytes == sizeof(ipc_frame.header));
|
|
assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
|
|
UV_IPC_TCP_CONNECTION));
|
|
|
|
if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
|
|
assert(avail - sizeof(ipc_frame.header) >=
|
|
sizeof(ipc_frame.socket_info_ex));
|
|
|
|
/* Read the TCP socket info. */
|
|
if (!ReadFile(handle->handle,
|
|
&ipc_frame.socket_info_ex,
|
|
sizeof(ipc_frame) - sizeof(ipc_frame.header),
|
|
&bytes,
|
|
NULL)) {
|
|
uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
|
|
uv_null_buf_);
|
|
break;
|
|
}
|
|
|
|
assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
|
|
|
|
/* Store the pending socket info. */
|
|
uv__pipe_insert_pending_socket(
|
|
handle,
|
|
&ipc_frame.socket_info_ex,
|
|
ipc_frame.header.flags & UV_IPC_TCP_CONNECTION);
|
|
}
|
|
|
|
if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
|
|
handle->pipe.conn.remaining_ipc_rawdata_bytes =
|
|
ipc_frame.header.raw_data_length;
|
|
continue;
|
|
}
|
|
} else {
|
|
avail = min(avail, (DWORD)handle->pipe.conn.remaining_ipc_rawdata_bytes);
|
|
}
|
|
}
|
|
|
|
buf = uv_buf_init(NULL, 0);
|
|
handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
|
|
if (buf.base == NULL || buf.len == 0) {
|
|
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
|
|
break;
|
|
}
|
|
assert(buf.base != NULL);
|
|
|
|
if (ReadFile(handle->handle,
|
|
buf.base,
|
|
min(buf.len, avail),
|
|
&bytes,
|
|
NULL)) {
|
|
/* Successful read */
|
|
if (handle->ipc) {
|
|
assert(handle->pipe.conn.remaining_ipc_rawdata_bytes >= bytes);
|
|
handle->pipe.conn.remaining_ipc_rawdata_bytes =
|
|
handle->pipe.conn.remaining_ipc_rawdata_bytes - bytes;
|
|
}
|
|
handle->read_cb((uv_stream_t*)handle, bytes, &buf);
|
|
|
|
/* Read again only if bytes == buf.len */
|
|
if (bytes <= buf.len) {
|
|
break;
|
|
}
|
|
} else {
|
|
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/* Post another 0-read if still reading and not closing. */
|
|
if ((handle->flags & UV_HANDLE_READING) &&
|
|
!(handle->flags & UV_HANDLE_READ_PENDING)) {
|
|
uv_pipe_queue_read(loop, handle);
|
|
}
|
|
}
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
}
|
|
|
|
|
|
void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_write_t* req) {
|
|
int err;
|
|
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
assert(handle->write_queue_size >= req->u.io.queued_bytes);
|
|
handle->write_queue_size -= req->u.io.queued_bytes;
|
|
|
|
UNREGISTER_HANDLE_REQ(loop, handle, req);
|
|
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
|
if (req->wait_handle != INVALID_HANDLE_VALUE) {
|
|
UnregisterWait(req->wait_handle);
|
|
req->wait_handle = INVALID_HANDLE_VALUE;
|
|
}
|
|
if (req->event_handle) {
|
|
CloseHandle(req->event_handle);
|
|
req->event_handle = NULL;
|
|
}
|
|
}
|
|
|
|
if (req->ipc_header) {
|
|
if (req == &handle->pipe.conn.ipc_header_write_req) {
|
|
req->type = UV_UNKNOWN_REQ;
|
|
} else {
|
|
uv__free(req);
|
|
}
|
|
} else {
|
|
if (req->cb) {
|
|
err = GET_REQ_ERROR(req);
|
|
req->cb(req, uv_translate_sys_error(err));
|
|
}
|
|
}
|
|
|
|
handle->stream.conn.write_reqs_pending--;
|
|
|
|
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
|
|
handle->pipe.conn.non_overlapped_writes_tail) {
|
|
assert(handle->stream.conn.write_reqs_pending > 0);
|
|
uv_queue_non_overlapped_write(handle);
|
|
}
|
|
|
|
if (handle->stream.conn.shutdown_req != NULL &&
|
|
handle->stream.conn.write_reqs_pending == 0) {
|
|
uv_want_endgame(loop, (uv_handle_t*)handle);
|
|
}
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
}
|
|
|
|
|
|
void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_req_t* raw_req) {
|
|
uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
|
|
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
if (handle->flags & UV__HANDLE_CLOSING) {
|
|
/* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */
|
|
assert(req->pipeHandle == INVALID_HANDLE_VALUE);
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
return;
|
|
}
|
|
|
|
if (REQ_SUCCESS(req)) {
|
|
assert(req->pipeHandle != INVALID_HANDLE_VALUE);
|
|
req->next_pending = handle->pipe.serv.pending_accepts;
|
|
handle->pipe.serv.pending_accepts = req;
|
|
|
|
if (handle->stream.serv.connection_cb) {
|
|
handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
|
|
}
|
|
} else {
|
|
if (req->pipeHandle != INVALID_HANDLE_VALUE) {
|
|
CloseHandle(req->pipeHandle);
|
|
req->pipeHandle = INVALID_HANDLE_VALUE;
|
|
}
|
|
if (!(handle->flags & UV__HANDLE_CLOSING)) {
|
|
uv_pipe_queue_accept(loop, handle, req, FALSE);
|
|
}
|
|
}
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
}
|
|
|
|
|
|
void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_connect_t* req) {
|
|
int err;
|
|
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
UNREGISTER_HANDLE_REQ(loop, handle, req);
|
|
|
|
if (req->cb) {
|
|
err = 0;
|
|
if (REQ_SUCCESS(req)) {
|
|
uv_pipe_connection_init(handle);
|
|
} else {
|
|
err = GET_REQ_ERROR(req);
|
|
}
|
|
req->cb(req, uv_translate_sys_error(err));
|
|
}
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
}
|
|
|
|
|
|
void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_shutdown_t* req) {
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
UNREGISTER_HANDLE_REQ(loop, handle, req);
|
|
|
|
if (handle->flags & UV_HANDLE_READABLE) {
|
|
/* Initialize and optionally start the eof timer. Only do this if the */
|
|
/* pipe is readable and we haven't seen EOF come in ourselves. */
|
|
eof_timer_init(handle);
|
|
|
|
/* If reading start the timer right now. */
|
|
/* Otherwise uv_pipe_queue_read will start it. */
|
|
if (handle->flags & UV_HANDLE_READ_PENDING) {
|
|
eof_timer_start(handle);
|
|
}
|
|
|
|
} else {
|
|
/* This pipe is not readable. We can just close it to let the other end */
|
|
/* know that we're done writing. */
|
|
close_pipe(handle);
|
|
}
|
|
|
|
if (req->cb) {
|
|
req->cb(req, 0);
|
|
}
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
}
|
|
|
|
|
|
static void eof_timer_init(uv_pipe_t* pipe) {
|
|
int r;
|
|
|
|
assert(pipe->pipe.conn.eof_timer == NULL);
|
|
assert(pipe->flags & UV_HANDLE_CONNECTION);
|
|
|
|
pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
|
|
|
|
r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
|
|
assert(r == 0); /* timers can't fail */
|
|
pipe->pipe.conn.eof_timer->data = pipe;
|
|
uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
|
|
}
|
|
|
|
|
|
static void eof_timer_start(uv_pipe_t* pipe) {
|
|
assert(pipe->flags & UV_HANDLE_CONNECTION);
|
|
|
|
if (pipe->pipe.conn.eof_timer != NULL) {
|
|
uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
|
|
}
|
|
}
|
|
|
|
|
|
static void eof_timer_stop(uv_pipe_t* pipe) {
|
|
assert(pipe->flags & UV_HANDLE_CONNECTION);
|
|
|
|
if (pipe->pipe.conn.eof_timer != NULL) {
|
|
uv_timer_stop(pipe->pipe.conn.eof_timer);
|
|
}
|
|
}
|
|
|
|
|
|
static void eof_timer_cb(uv_timer_t* timer) {
|
|
uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
|
|
uv_loop_t* loop = timer->loop;
|
|
|
|
assert(pipe->type == UV_NAMED_PIPE);
|
|
|
|
/* This should always be true, since we start the timer only */
|
|
/* in uv_pipe_queue_read after successfully calling ReadFile, */
|
|
/* or in uv_process_pipe_shutdown_req if a read is pending, */
|
|
/* and we always immediately stop the timer in */
|
|
/* uv_process_pipe_read_req. */
|
|
assert(pipe->flags & UV_HANDLE_READ_PENDING);
|
|
|
|
/* If there are many packets coming off the iocp then the timer callback */
|
|
/* may be called before the read request is coming off the queue. */
|
|
/* Therefore we check here if the read request has completed but will */
|
|
/* be processed later. */
|
|
if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
|
|
HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
|
|
return;
|
|
}
|
|
|
|
/* Force both ends off the pipe. */
|
|
close_pipe(pipe);
|
|
|
|
/* Stop reading, so the pending read that is going to fail will */
|
|
/* not be reported to the user. */
|
|
uv_read_stop((uv_stream_t*) pipe);
|
|
|
|
/* Report the eof and update flags. This will get reported even if the */
|
|
/* user stopped reading in the meantime. TODO: is that okay? */
|
|
uv_pipe_read_eof(loop, pipe, uv_null_buf_);
|
|
}
|
|
|
|
|
|
static void eof_timer_destroy(uv_pipe_t* pipe) {
|
|
assert(pipe->flags & UV_HANDLE_CONNECTION);
|
|
|
|
if (pipe->pipe.conn.eof_timer) {
|
|
uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
|
|
pipe->pipe.conn.eof_timer = NULL;
|
|
}
|
|
}
|
|
|
|
|
|
static void eof_timer_close_cb(uv_handle_t* handle) {
|
|
assert(handle->type == UV_TIMER);
|
|
uv__free(handle);
|
|
}
|
|
|
|
|
|
int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
|
|
HANDLE os_handle = uv__get_osfhandle(file);
|
|
NTSTATUS nt_status;
|
|
IO_STATUS_BLOCK io_status;
|
|
FILE_ACCESS_INFORMATION access;
|
|
DWORD duplex_flags = 0;
|
|
|
|
if (os_handle == INVALID_HANDLE_VALUE)
|
|
return UV_EBADF;
|
|
|
|
/* In order to avoid closing a stdio file descriptor 0-2, duplicate the
|
|
* underlying OS handle and forget about the original fd.
|
|
* We could also opt to use the original OS handle and just never close it,
|
|
* but then there would be no reliable way to cancel pending read operations
|
|
* upon close.
|
|
*/
|
|
if (file <= 2) {
|
|
if (!DuplicateHandle(INVALID_HANDLE_VALUE,
|
|
os_handle,
|
|
INVALID_HANDLE_VALUE,
|
|
&os_handle,
|
|
0,
|
|
FALSE,
|
|
DUPLICATE_SAME_ACCESS))
|
|
return uv_translate_sys_error(GetLastError());
|
|
file = -1;
|
|
}
|
|
|
|
/* Determine what kind of permissions we have on this handle.
|
|
* Cygwin opens the pipe in message mode, but we can support it,
|
|
* just query the access flags and set the stream flags accordingly.
|
|
*/
|
|
nt_status = pNtQueryInformationFile(os_handle,
|
|
&io_status,
|
|
&access,
|
|
sizeof(access),
|
|
FileAccessInformation);
|
|
if (nt_status != STATUS_SUCCESS)
|
|
return UV_EINVAL;
|
|
|
|
if (pipe->ipc) {
|
|
if (!(access.AccessFlags & FILE_WRITE_DATA) ||
|
|
!(access.AccessFlags & FILE_READ_DATA)) {
|
|
return UV_EINVAL;
|
|
}
|
|
}
|
|
|
|
if (access.AccessFlags & FILE_WRITE_DATA)
|
|
duplex_flags |= UV_HANDLE_WRITABLE;
|
|
if (access.AccessFlags & FILE_READ_DATA)
|
|
duplex_flags |= UV_HANDLE_READABLE;
|
|
|
|
if (os_handle == INVALID_HANDLE_VALUE ||
|
|
uv_set_pipe_handle(pipe->loop,
|
|
pipe,
|
|
os_handle,
|
|
file,
|
|
duplex_flags) == -1) {
|
|
return UV_EINVAL;
|
|
}
|
|
|
|
uv_pipe_connection_init(pipe);
|
|
|
|
if (pipe->ipc) {
|
|
assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
|
|
pipe->pipe.conn.ipc_pid = uv_parent_pid();
|
|
assert(pipe->pipe.conn.ipc_pid != -1);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
|
|
NTSTATUS nt_status;
|
|
IO_STATUS_BLOCK io_status;
|
|
FILE_NAME_INFORMATION tmp_name_info;
|
|
FILE_NAME_INFORMATION* name_info;
|
|
WCHAR* name_buf;
|
|
unsigned int addrlen;
|
|
unsigned int name_size;
|
|
unsigned int name_len;
|
|
int err;
|
|
|
|
name_info = NULL;
|
|
|
|
if (handle->handle == INVALID_HANDLE_VALUE) {
|
|
*size = 0;
|
|
return UV_EINVAL;
|
|
}
|
|
|
|
uv__pipe_pause_read((uv_pipe_t*)handle); /* cast away const warning */
|
|
|
|
nt_status = pNtQueryInformationFile(handle->handle,
|
|
&io_status,
|
|
&tmp_name_info,
|
|
sizeof tmp_name_info,
|
|
FileNameInformation);
|
|
if (nt_status == STATUS_BUFFER_OVERFLOW) {
|
|
name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
|
|
name_info = uv__malloc(name_size);
|
|
if (!name_info) {
|
|
*size = 0;
|
|
err = UV_ENOMEM;
|
|
goto cleanup;
|
|
}
|
|
|
|
nt_status = pNtQueryInformationFile(handle->handle,
|
|
&io_status,
|
|
name_info,
|
|
name_size,
|
|
FileNameInformation);
|
|
}
|
|
|
|
if (nt_status != STATUS_SUCCESS) {
|
|
*size = 0;
|
|
err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
|
|
goto error;
|
|
}
|
|
|
|
if (!name_info) {
|
|
/* the struct on stack was used */
|
|
name_buf = tmp_name_info.FileName;
|
|
name_len = tmp_name_info.FileNameLength;
|
|
} else {
|
|
name_buf = name_info->FileName;
|
|
name_len = name_info->FileNameLength;
|
|
}
|
|
|
|
if (name_len == 0) {
|
|
*size = 0;
|
|
err = 0;
|
|
goto error;
|
|
}
|
|
|
|
name_len /= sizeof(WCHAR);
|
|
|
|
/* check how much space we need */
|
|
addrlen = WideCharToMultiByte(CP_UTF8,
|
|
0,
|
|
name_buf,
|
|
name_len,
|
|
NULL,
|
|
0,
|
|
NULL,
|
|
NULL);
|
|
if (!addrlen) {
|
|
*size = 0;
|
|
err = uv_translate_sys_error(GetLastError());
|
|
goto error;
|
|
} else if (pipe_prefix_len + addrlen >= *size) {
|
|
/* "\\\\.\\pipe" + name */
|
|
*size = pipe_prefix_len + addrlen + 1;
|
|
err = UV_ENOBUFS;
|
|
goto error;
|
|
}
|
|
|
|
memcpy(buffer, pipe_prefix, pipe_prefix_len);
|
|
addrlen = WideCharToMultiByte(CP_UTF8,
|
|
0,
|
|
name_buf,
|
|
name_len,
|
|
buffer+pipe_prefix_len,
|
|
*size-pipe_prefix_len,
|
|
NULL,
|
|
NULL);
|
|
if (!addrlen) {
|
|
*size = 0;
|
|
err = uv_translate_sys_error(GetLastError());
|
|
goto error;
|
|
}
|
|
|
|
addrlen += pipe_prefix_len;
|
|
*size = addrlen;
|
|
buffer[addrlen] = '\0';
|
|
|
|
err = 0;
|
|
goto cleanup;
|
|
|
|
error:
|
|
uv__free(name_info);
|
|
|
|
cleanup:
|
|
uv__pipe_unpause_read((uv_pipe_t*)handle); /* cast away const warning */
|
|
return err;
|
|
}
|
|
|
|
|
|
int uv_pipe_pending_count(uv_pipe_t* handle) {
|
|
if (!handle->ipc)
|
|
return 0;
|
|
return handle->pipe.conn.pending_ipc_info.queue_len;
|
|
}
|
|
|
|
|
|
int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
|
|
if (handle->flags & UV_HANDLE_BOUND)
|
|
return uv__pipe_getname(handle, buffer, size);
|
|
|
|
if (handle->flags & UV_HANDLE_CONNECTION ||
|
|
handle->handle != INVALID_HANDLE_VALUE) {
|
|
*size = 0;
|
|
return 0;
|
|
}
|
|
|
|
return UV_EBADF;
|
|
}
|
|
|
|
|
|
int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
|
|
/* emulate unix behaviour */
|
|
if (handle->flags & UV_HANDLE_BOUND)
|
|
return UV_ENOTCONN;
|
|
|
|
if (handle->handle != INVALID_HANDLE_VALUE)
|
|
return uv__pipe_getname(handle, buffer, size);
|
|
|
|
return UV_EBADF;
|
|
}
|
|
|
|
|
|
uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
|
|
if (!handle->ipc)
|
|
return UV_UNKNOWN_HANDLE;
|
|
if (handle->pipe.conn.pending_ipc_info.queue_len == 0)
|
|
return UV_UNKNOWN_HANDLE;
|
|
else
|
|
return UV_TCP;
|
|
}
|