#
#
# Nim's Runtime Library
# (c) Copyright 2015 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module implements asynchronous IO. This includes a dispatcher,
## a ``Future`` type implementation, and an ``async`` macro which allows
## asynchronous code to be written in a synchronous style with the ``await``
## keyword.
##
## The dispatcher acts as a kind of event loop. You must call ``poll`` on it
## (or a function which does so for you such as ``waitFor`` or ``runForever``)
## in order to poll for any outstanding events. The underlying implementation
## is based on epoll on Linux, IO Completion Ports on Windows and select on
## other operating systems.
##
## The ``poll`` function will not, on its own, return any events. Instead
## an appropriate ``Future`` object will be completed. A ``Future`` is a
## type which holds a value which is not yet available, but which *may* be
## available in the future. You can check whether a future is finished
## by using the ``finished`` function. When a future is finished it means that
## either the value that it holds is now available or it holds an error instead.
## The latter situation occurs when the operation to complete a future fails
## with an exception. You can distinguish between the two situations with the
## ``failed`` function.
##
## Future objects can also store a callback procedure which will be called
## automatically once the future completes.
##
## Futures therefore can be thought of as an implementation of the proactor
## pattern. In this
## pattern you make a request for an action, and once that action is fulfilled
## a future is completed with the result of that action. Requests can be
## made by calling the appropriate functions. For example: calling the ``recv``
## function will create a request for some data to be read from a socket. The
## future which the ``recv`` function returns will then complete once the
## requested amount of data is read **or** an exception occurs.
##
## Code to read some data from a socket may look something like this:
##
## .. code-block::nim
## var future = socket.recv(100)
## future.addCallback(
## proc () =
## echo(future.read)
## )
##
## All asynchronous functions returning a ``Future`` will not block. They
## will not however return immediately. An asynchronous function will have
## code which will be executed before an asynchronous request is made, in most
## cases this code sets up the request.
##
## In the above example, the ``recv`` function will return a brand new
## ``Future`` instance once the request for data to be read from the socket
## is made. This ``Future`` instance will complete once the requested amount
## of data is read, in this case it is 100 bytes. The second line sets a
## callback on this future which will be called once the future completes.
## All the callback does is write the data stored in the future to ``stdout``.
## The ``read`` function is used for this and it checks whether the future
## completes with an error for you (if it did it will simply raise the
## error), if there is no error however it returns the value of the future.
##
## Asynchronous procedures
## =======================
##
## Asynchronous procedures remove the pain of working with callbacks. They do
## this by allowing you to write asynchronous code the same way as you would
## write synchronous code.
##
## An asynchronous procedure is marked using the ``{.async.}`` pragma.
## When marking a procedure with the ``{.async.}`` pragma it must have a
## ``Future[T]`` return type or no return type at all. If you do not specify
## a return type then ``Future[void]`` is assumed.
##
## Inside asynchronous procedures ``await`` can be used to call any
## procedures which return a
## ``Future``; this includes asynchronous procedures. When a procedure is
## "awaited", the asynchronous procedure it is awaited in will
## suspend its execution
## until the awaited procedure's Future completes. At which point the
## asynchronous procedure will resume its execution. During the period
## when an asynchronous procedure is suspended other asynchronous procedures
## will be run by the dispatcher.
##
## The ``await`` call may be used in many contexts. It can be used on the right
## hand side of a variable declaration: ``var data = await socket.recv(100)``,
## in which case the variable will be set to the value of the future
## automatically. It can be used to await a ``Future`` object, and it can
## be used to await a procedure returning a ``Future[void]``:
## ``await socket.send("foobar")``.
##
## If an awaited future completes with an error, then ``await`` will re-raise
## this error. To avoid this, you can use the ``yield`` keyword instead of
## ``await``. The following section shows different ways that you can handle
## exceptions in async procs.
##
## Handling Exceptions
## -------------------
##
## The most reliable way to handle exceptions is to use ``yield`` on a future
## then check the future's ``failed`` property. For example:
##
## .. code-block:: Nim
## var future = sock.recv(100)
## yield future
## if future.failed:
## # Handle exception
##
## The ``async`` procedures also offer limited support for the try statement.
##
## .. code-block:: Nim
## try:
## let data = await sock.recv(100)
## echo("Received ", data)
## except:
## # Handle exception
##
## Unfortunately the semantics of the try statement may not always be correct,
## and occasionally the compilation may fail altogether.
## As such it is better to use the former style when possible.
##
##
## Discarding futures
## ==================
##
## Futures should **never** be discarded. This is because they may contain
## errors. If you do not care for the result of a Future then you should
## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. Note
## however that this does not wait for completion, and you should use
## ``waitFor`` for that purpose.
##
## Examples
## ========
##
## For examples take a look at the documentation for the modules implementing
## asynchronous IO. A good place to start is the
## `asyncnet module <asyncnet.html>`_.
##
## Investigating pending futures
## =============================
##
## It's possible to get into a situation where an async proc, or more accurately
## a ``Future[T]`` gets stuck and
## never completes. This can happen for various reasons and can cause serious
## memory leaks. When this occurs it's hard to identify the procedure that is
## stuck.
##
## Thankfully there is a mechanism which tracks the count of each pending future.
## All you need to do to enable it is compile with ``-d:futureLogging`` and
## use the ``getFuturesInProgress`` procedure to get the list of pending futures
## together with the stack traces to the moment of their creation.
##
## You may also find it useful to use this
## `prometheus package <https://github.com/dom96/prometheus>`_ which will log
## the pending futures into prometheus, allowing you to analyse them via a nice
## graph.
##
##
##
## Limitations/Bugs
## ================
##
## * The effect system (``raises: []``) does not work with async procedures.
import os, tables, strutils, times, heapqueue, options, asyncstreams
import options, math, std/monotimes
import asyncfutures except callSoon
import nativesockets, net, deques
export Port, SocketFlag
export asyncfutures except callSoon
export asyncstreams
#{.injectStmt: newGcInvariant().}
# TODO: Check if yielded future is nil and throw a more meaningful exception
type
PDispatcherBase = ref object of RootRef
timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]]
callbacks*: Deque[proc () {.gcsafe.}]
proc processTimers(
p: PDispatcherBase, didSomeWork: var bool
): Option[int] {.inline.} =
# Pop the timers in the order in which they will expire (smaller `finishAt`).
var count = p.timers.len
let t = getMonoTime()
while count > 0 and t >= p.timers[0].finishAt:
p.timers.pop().fut.complete()
dec count
didSomeWork = true
# Return the number of milliseconds in which the next timer will expire.
if p.timers.len == 0: return
let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds
return some(millisecs.int + 1)
proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
while p.callbacks.len > 0:
var cb = p.callbacks.popFirst()
cb()
didSomeWork = true
proc adjustTimeout(
p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int]
): int {.inline.} =
if p.callbacks.len != 0:
return 0
if nextTimer.isNone() or pollTimeout == -1:
return pollTimeout
result = max(nextTimer.get(), 0)
result = min(pollTimeout, result)
proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.}
## Schedule `cbproc` to be called as soon as possible.
## The callback is called when control returns to the event loop.
proc initCallSoonProc =
if asyncfutures.getCallSoonProc().isNil:
asyncfutures.setCallSoonProc(callSoon)
when defined(windows) or defined(nimdoc):
import winlean, sets, hashes
type
CompletionKey = ULONG_PTR
CompletionData* = object
fd*: AsyncFD # TODO: Rename this.
cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD,
errcode: OSErrorCode) {.closure, gcsafe.})
cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
# when using RegisterWaitForSingleObject, because
# waiting is done in different thread.
PDispatcher* = ref object of PDispatcherBase
ioPort: Handle
handles: HashSet[AsyncFD]
CustomObj = object of OVERLAPPED
data*: CompletionData
CustomRef* = ref CustomObj
AsyncFD* = distinct int
PostCallbackData = object
ioPort: Handle
handleFd: AsyncFD
waitFd: Handle
ovl: owned CustomRef
PostCallbackDataPtr = ptr PostCallbackData
AsyncEventImpl = object
hEvent: Handle
hWaiter: Handle
pcd: PostCallbackDataPtr
AsyncEvent* = ptr AsyncEventImpl
Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
proc hash(x: AsyncFD): Hash {.borrow.}
proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
proc newDispatcher*(): owned PDispatcher =
## Creates a new Dispatcher instance.
new result
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.handles = initHashSet[AsyncFD]()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc () {.closure, gcsafe.}](64)
var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
proc setGlobalDispatcher*(disp: owned PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc getIoHandler*(disp: PDispatcher): Handle =
## Returns the underlying IO Completion Port handle (Windows) or selector
## (Unix) for the specified dispatcher.
return disp.ioPort
proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
let p = getGlobalDispatcher()
if createIoCompletionPort(fd.Handle, p.ioPort,
cast[CompletionKey](fd), 1) == 0:
raiseOSError(osLastError())
p.handles.incl(fd)
proc verifyPresence(fd: AsyncFD) =
## Ensures that file descriptor has been registered with the dispatcher.
## Raises ValueError if `fd` has not been registered.
let p = getGlobalDispatcher()
if fd notin p.handles:
raise newException(ValueError,
"Operation performed on a socket which has not been registered with" &
" the dispatcher yet.")
proc hasPendingOperations*(): bool =
## Returns `true` if the global dispatcher has pending operations.
let p = getGlobalDispatcher()
p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
proc runOnce(timeout = 500): bool =
let p = getGlobalDispatcher()
if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
raise newException(ValueError,
"No handles or timers registered in dispatcher.")
result = false
let nextTimer = processTimers(p, result)
let at = adjustTimeout(p, timeout, nextTimer)
var llTimeout =
if at == -1: winlean.INFINITE
else: at.int32
var lpNumberOfBytesTransferred: DWORD
var lpCompletionKey: ULONG_PTR
var customOverlapped: CustomRef
let res = getQueuedCompletionStatus(p.ioPort,
addr lpNumberOfBytesTransferred, addr lpCompletionKey,
cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
result = true
# For 'gcDestructors' the destructor of 'customOverlapped' will
# be called at the end and we are the only owner here. This means
# We do not have to 'GC_unref(customOverlapped)' because the destructor
# does that for us.
# http://stackoverflow.com/a/12277264/492186
# TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
if res:
# This is useful for ensuring the reliability of the overlapped struct.
assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
customOverlapped.data.cb(customOverlapped.data.fd,
lpNumberOfBytesTransferred, OSErrorCode(-1))
# If cell.data != nil, then system.protect(rawEnv(cb)) was called,
# so we need to dispose our `cb` environment, because it is not needed
# anymore.
if customOverlapped.data.cell.data != nil:
system.dispose(customOverlapped.data.cell)
when not defined(gcDestructors):
GC_unref(customOverlapped)
else:
let errCode = osLastError()
if customOverlapped != nil:
assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
customOverlapped.data.cb(customOverlapped.data.fd,
lpNumberOfBytesTransferred, errCode)
if customOverlapped.data.cell.data != nil:
system.dispose(customOverlapped.data.cell)
when not defined(gcDestructors):
GC_unref(customOverlapped)
else:
if errCode.int32 == WAIT_TIMEOUT:
# Timed out
result = false
else: raiseOSError(errCode)
# Timer processing.
discard processTimers(p, result)
# Callback queue processing
processPendingCallbacks(p, result)
var acceptEx: WSAPROC_ACCEPTEX
var connectEx: WSAPROC_CONNECTEX
var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
# Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
var bytesRet: DWORD
fun = nil
result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
addr bytesRet, nil, nil) == 0
proc initAll() =
let dummySock = createNativeSocket()
if dummySock == INVALID_SOCKET:
raiseOSError(osLastError())
var fun: pointer = nil
if not initPointer(dummySock, fun, WSAID_CONNECTEX):
raiseOSError(osLastError())
connectEx = cast[WSAPROC_CONNECTEX](fun)
if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
raiseOSError(osLastError())
acceptEx = cast[WSAPROC_ACCEPTEX](fun)
if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
raiseOSError(osLastError())
getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
close(dummySock)
proc newCustom*(): CustomRef =
result = CustomRef() # 0
GC_ref(result) # 1 prevent destructor from doing a premature free.
# destructor of newCustom's caller --> 0. This means
# Windows holds a ref for us with RC == 0 (single owner).
# This is passed back to us in the IO completion port.
proc recv*(socket: AsyncFD, size: int,
flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
## Reads **up to** ``size`` bytes from ``socket``. Returned future will
## complete once all the data requested is read, a part of the data has been
## read, or the socket has disconnected in which case the future will
## complete with a value of ``""``.
##
## **Warning**: The ``Peek`` socket flag is not supported on Windows.
# Things to note:
# * When WSARecv completes immediately then ``bytesReceived`` is very
# unreliable.
# * Still need to implement message-oriented socket disconnection,
# '\0' in the message currently signifies a socket disconnect. Who
# knows what will happen when someone sends that to our socket.
verifyPresence(socket)
assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
var retFuture = newFuture[string]("recv")
var dataBuf: TWSABuf
dataBuf.buf = cast[cstring](alloc0(size))
dataBuf.len = size.ULONG
var bytesReceived: DWORD
var flagsio = flags.toOSFlags().DWORD
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
if bytesCount == 0 and dataBuf.buf[0] == '\0':
retFuture.complete("")
else:
var data = newString(bytesCount)
assert bytesCount <= size
copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
retFuture.complete($data)
else:
if flags.isDisconnectionError(errcode):
retFuture.complete("")
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
if dataBuf.buf != nil:
dealloc dataBuf.buf
dataBuf.buf = nil
)
let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
addr flagsio, cast[POVERLAPPED](ol), nil)
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
if dataBuf.buf != nil:
dealloc dataBuf.buf
dataBuf.buf = nil
GC_unref(ol)
if flags.isDisconnectionError(err):
retFuture.complete("")
else:
retFuture.fail(newException(OSError, osErrorMsg(err)))
elif ret == 0:
# Request completed immediately.
if bytesReceived != 0:
var data = newString(bytesReceived)
assert bytesReceived <= size
copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
retFuture.complete($data)
else:
if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
retFuture.complete("")
return retFuture
proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
## at least be of that size. Returned future will complete once all the
## data requested is read, a part of the data has been read, or the socket
## has disconnected in which case the future will complete with a value of
## ``0``.
##
## **Warning**: The ``Peek`` socket flag is not supported on Windows.
# Things to note:
# * When WSARecv completes immediately then ``bytesReceived`` is very
# unreliable.
# * Still need to implement message-oriented socket disconnection,
# '\0' in the message currently signifies a socket disconnect. Who
# knows what will happen when someone sends that to our socket.
verifyPresence(socket)
assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
var retFuture = newFuture[int]("recvInto")
#buf[] = '\0'
var dataBuf: TWSABuf
dataBuf.buf = cast[cstring](buf)
dataBuf.len = size.ULONG
var bytesReceived: DWORD
var flagsio = flags.toOSFlags().DWORD
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
retFuture.complete(bytesCount)
else:
if flags.isDisconnectionError(errcode):
retFuture.complete(0)
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
if dataBuf.buf != nil:
dataBuf.buf = nil
)
let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
addr flagsio, cast[POVERLAPPED](ol), nil)
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
if dataBuf.buf != nil:
dataBuf.buf = nil
GC_unref(ol)
if flags.isDisconnectionError(err):
retFuture.complete(0)
else:
retFuture.fail(newException(OSError, osErrorMsg(err)))
elif ret == 0:
# Request completed immediately.
if bytesReceived != 0:
assert bytesReceived <= size
retFuture.complete(bytesReceived)
else:
if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
retFuture.complete(bytesReceived)
return retFuture
proc send*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future
## will complete once all data has been sent.
##
## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object,
## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
verifyPresence(socket)
var retFuture = newFuture[void]("send")
var dataBuf: TWSABuf
dataBuf.buf = cast[cstring](buf)
dataBuf.len = size.ULONG
var bytesReceived, lowFlags: DWORD
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
retFuture.complete()
else:
if flags.isDisconnectionError(errcode):
retFuture.complete()
else:
retFuture.fail(newOSError(errcode))
)
let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
lowFlags, cast[POVERLAPPED](ol), nil)
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
GC_unref(ol)
if flags.isDisconnectionError(err):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(err)))
else:
retFuture.complete()
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
saddrLen: SockLen,
flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
## Sends ``data`` to specified destination ``saddr``, using
## socket ``socket``. The returned future will complete once all data
## has been sent.
verifyPresence(socket)
var retFuture = newFuture[void]("sendTo")
var dataBuf: TWSABuf
dataBuf.buf = cast[cstring](data)
dataBuf.len = size.ULONG
var bytesSent = 0.DWORD
var lowFlags = 0.DWORD
# we will preserve address in our stack
var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
var stalen: cint = cint(saddrLen)
zeroMem(addr(staddr[0]), 128)
copyMem(addr(staddr[0]), saddr, saddrLen)
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
)
let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
lowFlags, cast[ptr SockAddr](addr(staddr[0])),
stalen, cast[POVERLAPPED](ol), nil)
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
GC_unref(ol)
retFuture.fail(newException(OSError, osErrorMsg(err)))
else:
retFuture.complete()
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
saddr: ptr SockAddr, saddrLen: ptr SockLen,
flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
## Receives a datagram data from ``socket`` into ``buf``, which must
## be at least of size ``size``, address of datagram's sender will be
## stored into ``saddr`` and ``saddrLen``. Returned future will complete
## once one datagram has been received, and will return size of packet
## received.
verifyPresence(socket)
var retFuture = newFuture[int]("recvFromInto")
var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
var bytesReceived = 0.DWORD
var lowFlags = 0.DWORD
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
assert bytesCount <= size
retFuture.complete(bytesCount)
else:
# datagram sockets don't have disconnection,
# so we can just raise an exception
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
)
let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
addr bytesReceived, addr lowFlags,
saddr, cast[ptr cint](saddrLen),
cast[POVERLAPPED](ol), nil)
if res == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
GC_unref(ol)
retFuture.fail(newException(OSError, osErrorMsg(err)))
else:
# Request completed immediately.
if bytesReceived != 0:
assert bytesReceived <= size
retFuture.complete(bytesReceived)
else:
if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
retFuture.complete(bytesReceived)
return retFuture
proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
owned(Future[tuple[address: string, client: AsyncFD]]) =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection and the remote address of the client.
## The future will complete when the connection is successfully accepted.
##
## The resulting client socket is automatically registered to the
## dispatcher.
##
## The ``accept`` call may result in an error if the connecting socket
## disconnects during the duration of the ``accept``. If the ``SafeDisconn``
## flag is specified then this error will not be raised and instead
## accept will be called again.
verifyPresence(socket)
var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
var clientSock = createNativeSocket()
if clientSock == osInvalidSocket: raiseOSError(osLastError())
const lpOutputLen = 1024
var lpOutputBuf = newString(lpOutputLen)
var dwBytesReceived: DWORD
let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
template failAccept(errcode) =
if flags.isDisconnectionError(errcode):
var newAcceptFut = acceptAddr(socket, flags)
newAcceptFut.callback =
proc () =
if newAcceptFut.failed:
retFuture.fail(newAcceptFut.readError)
else:
retFuture.complete(newAcceptFut.read)
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
template completeAccept() {.dirty.} =
var listenSock = socket
let setoptRet = setsockopt(clientSock, SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
sizeof(listenSock).SockLen)
if setoptRet != 0:
let errcode = osLastError()
discard clientSock.closesocket()
failAccept(errcode)
else:
var localSockaddr, remoteSockaddr: ptr SockAddr
var localLen, remoteLen: int32
getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength,
dwLocalAddressLength, dwRemoteAddressLength,
addr localSockaddr, addr localLen,
addr remoteSockaddr, addr remoteLen)
try:
let address = getAddrString(remoteSockaddr)
register(clientSock.AsyncFD)
retFuture.complete((address: address, client: clientSock.AsyncFD))
except:
# getAddrString may raise
clientSock.close()
retFuture.fail(getCurrentException())
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
completeAccept()
else:
failAccept(errcode)
)
# http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
dwReceiveDataLength,
dwLocalAddressLength,
dwRemoteAddressLength,
addr dwBytesReceived, cast[POVERLAPPED](ol))
if not ret:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
failAccept(err)
GC_unref(ol)
else:
completeAccept()
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc closeSocket*(socket: AsyncFD) =
## Closes a socket and ensures that it is unregistered.
socket.SocketHandle.close()
getGlobalDispatcher().handles.excl(socket)
proc unregister*(fd: AsyncFD) =
## Unregisters ``fd``.
getGlobalDispatcher().handles.excl(fd)
proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
return fd in disp.handles
{.push stackTrace: off.}
proc waitableCallback(param: pointer,
timerOrWaitFired: WINBOOL) {.stdcall.} =
var p = cast[PostCallbackDataPtr](param)
discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
ULONG_PTR(p.handleFd),
cast[pointer](p.ovl))
{.pop.}
proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) =
let p = getGlobalDispatcher()
var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD
var hEvent = wsaCreateEvent()
if hEvent == 0:
raiseOSError(osLastError())
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
pcd.ioPort = p.ioPort
pcd.handleFd = fd
var ol = newCustom()
ol.data = CompletionData(fd: fd, cb:
proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
# we excluding our `fd` because cb(fd) can register own handler
# for this `fd`
p.handles.excl(fd)
# unregisterWait() is called before callback, because appropriate
# winsockets function can re-enable event.
# https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
if unregisterWait(pcd.waitFd) == 0:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
deallocShared(cast[pointer](pcd))
discard wsaCloseEvent(hEvent)
raiseOSError(err)
if cb(fd):
# callback returned `true`, so we free all allocated resources
deallocShared(cast[pointer](pcd))
if not wsaCloseEvent(hEvent):
raiseOSError(osLastError())
# pcd.ovl will be unrefed in poll().
else:
# callback returned `false` we need to continue
if p.handles.contains(fd):
# new callback was already registered with `fd`, so we free all
# allocated resources. This happens because in callback `cb`
# addRead/addWrite was called with same `fd`.
deallocShared(cast[pointer](pcd))
if not wsaCloseEvent(hEvent):
raiseOSError(osLastError())
else:
# we need to include `fd` again
p.handles.incl(fd)
# and register WaitForSingleObject again
if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
cast[WAITORTIMERCALLBACK](waitableCallback),
cast[pointer](pcd), INFINITE, flags):
# pcd.ovl will be unrefed in poll()
let err = osLastError()
deallocShared(cast[pointer](pcd))
discard wsaCloseEvent(hEvent)
raiseOSError(err)
else:
# we incref `pcd.ovl` and `protect` callback one more time,
# because it will be unrefed and disposed in `poll()` after
# callback finishes.
GC_ref(pcd.ovl)
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
)
# We need to protect our callback environment value, so GC will not free it
# accidentally.
ol.data.cell = system.protect(rawEnv(ol.data.cb))
# This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
# will be signaled when appropriate `mask` events will be triggered.
if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
let err = osLastError()
GC_unref(ol)
deallocShared(cast[pointer](pcd))
discard wsaCloseEvent(hEvent)
raiseOSError(err)
pcd.ovl = ol
if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
cast[WAITORTIMERCALLBACK](waitableCallback),
cast[pointer](pcd), INFINITE, flags):
let err = osLastError()
GC_unref(ol)
deallocShared(cast[pointer](pcd))
discard wsaCloseEvent(hEvent)
raiseOSError(err)
p.handles.incl(fd)
proc addRead*(fd: AsyncFD, cb: Callback) =
## Start watching the file descriptor for read availability and then call
## the callback ``cb``.
##
## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
## so if you can avoid it, please do it. Use `addRead` only if really
## need it (main usecase is adaptation of unix-like libraries to be
## asynchronous on Windows).
##
## If you use this function, you don't need to use asyncdispatch.recv()
## or asyncdispatch.accept(), because they are using IOCP, please use
## nativesockets.recv() and nativesockets.accept() instead.
##
## Be sure your callback ``cb`` returns ``true``, if you want to remove
## watch of `read` notifications, and ``false``, if you want to continue
## receiving notifications.
registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
proc addWrite*(fd: AsyncFD, cb: Callback) =
## Start watching the file descriptor for write availability and then call
## the callback ``cb``.
##
## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
## so if you can avoid it, please do it. Use `addWrite` only if really
## need it (main usecase is adaptation of unix-like libraries to be
## asynchronous on Windows).
##
## If you use this function, you don't need to use asyncdispatch.send()
## or asyncdispatch.connect(), because they are using IOCP, please use
## nativesockets.send() and nativesockets.connect() instead.
##
## Be sure your callback ``cb`` returns ``true``, if you want to remove
## watch of `write` notifications, and ``false``, if you want to continue
## receiving notifications.
registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
handleCallback) =
let handleFD = AsyncFD(hEvent)
pcd.ioPort = p.ioPort
pcd.handleFd = handleFD
var ol = newCustom()
ol.data.fd = handleFD
ol.data.cb = handleCallback
# We need to protect our callback environment value, so GC will not free it
# accidentally.
ol.data.cell = system.protect(rawEnv(ol.data.cb))
pcd.ovl = ol
if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
cast[WAITORTIMERCALLBACK](waitableCallback),
cast[pointer](pcd), timeout.DWORD, flags):
let err = osLastError()
GC_unref(ol)
deallocShared(cast[pointer](pcd))
discard closeHandle(hEvent)
raiseOSError(err)
p.handles.incl(handleFD)
template closeWaitable(handle: untyped) =
let waitFd = pcd.waitFd
deallocShared(cast[pointer](pcd))
p.handles.excl(fd)
if unregisterWait(waitFd) == 0:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
discard closeHandle(handle)
raiseOSError(err)
if closeHandle(handle) == 0:
raiseOSError(osLastError())
proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
## Registers callback ``cb`` to be called when timer expired.
##
## Parameters:
##
## * ``timeout`` - timeout value in milliseconds.
## * ``oneshot``
## * `true` - generate only one timeout event
## * `false` - generate timeout events periodically
doAssert(timeout > 0)
let p = getGlobalDispatcher()
var hEvent = createEvent(nil, 1, 0, nil)
if hEvent == INVALID_HANDLE_VALUE:
raiseOSError(osLastError())
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
var flags = WT_EXECUTEINWAITTHREAD.DWORD
if oneshot: flags = flags or WT_EXECUTEONLYONCE
proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
let res = cb(fd)
if res or oneshot:
closeWaitable(hEvent)
else:
# if callback returned `false`, then it wants to be called again, so
# we need to ref and protect `pcd.ovl` again, because it will be
# unrefed and disposed in `poll()`.
GC_ref(pcd.ovl)
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
proc addProcess*(pid: int, cb: Callback) =
## Registers callback ``cb`` to be called when process with process ID
## ``pid`` exited.
let p = getGlobalDispatcher()
let procFlags = SYNCHRONIZE
var hProcess = openProcess(procFlags, 0, pid.DWORD)
if hProcess == INVALID_HANDLE_VALUE:
raiseOSError(osLastError())
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
var flags = WT_EXECUTEINWAITTHREAD.DWORD
proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
closeWaitable(hProcess)
discard cb(fd)
registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
proc newAsyncEvent*(): AsyncEvent =
## Creates a new thread-safe ``AsyncEvent`` object.
##
## New ``AsyncEvent`` object is not automatically registered with
## dispatcher like ``AsyncSocket``.
var sa = SECURITY_ATTRIBUTES(
nLength: sizeof(SECURITY_ATTRIBUTES).cint,
bInheritHandle: 1
)
var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
if event == INVALID_HANDLE_VALUE:
raiseOSError(osLastError())
result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
result.hEvent = event
proc trigger*(ev: AsyncEvent) =
## Set event ``ev`` to signaled state.
if setEvent(ev.hEvent) == 0:
raiseOSError(osLastError())
proc unregister*(ev: AsyncEvent) =
## Unregisters event ``ev``.
doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
let p = getGlobalDispatcher()
p.handles.excl(AsyncFD(ev.hEvent))
if unregisterWait(ev.hWaiter) == 0:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
raiseOSError(err)
ev.hWaiter = 0
proc close*(ev: AsyncEvent) =
## Closes event ``ev``.
let res = closeHandle(ev.hEvent)
deallocShared(cast[pointer](ev))
if res == 0:
raiseOSError(osLastError())
proc addEvent*(ev: AsyncEvent, cb: Callback) =
## Registers callback ``cb`` to be called when ``ev`` will be signaled
doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
let p = getGlobalDispatcher()
let hEvent = ev.hEvent
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
var flags = WT_EXECUTEINWAITTHREAD.DWORD
proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if ev.hWaiter != 0:
if cb(fd):
# we need this check to avoid exception, if `unregister(event)` was
# called in callback.
deallocShared(cast[pointer](pcd))
if ev.hWaiter != 0:
unregister(ev)
else:
# if callback returned `false`, then it wants to be called again, so
# we need to ref and protect `pcd.ovl` again, because it will be
# unrefed and disposed in `poll()`.
GC_ref(pcd.ovl)
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
else:
# if ev.hWaiter == 0, then event was unregistered before `poll()` call.
deallocShared(cast[pointer](pcd))
registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
ev.hWaiter = pcd.waitFd
initAll()
else:
import selectors
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
MSG_NOSIGNAL
const
InitCallbackListSize = 4 # initial size of callbacks sequence,
# associated with file/socket descriptor.
InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
# queue.
type
AsyncFD* = distinct cint
Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
AsyncData = object
readList: seq[Callback]
writeList: seq[Callback]
AsyncEvent* = distinct SelectEvent
PDispatcher* = ref object of PDispatcherBase
selector: Selector[AsyncData]
proc `==`*(x, y: AsyncFD): bool {.borrow.}
proc `==`*(x, y: AsyncEvent): bool {.borrow.}
template newAsyncData(): AsyncData =
AsyncData(
readList: newSeqOfCap[Callback](InitCallbackListSize),
writeList: newSeqOfCap[Callback](InitCallbackListSize)
)
proc newDispatcher*(): owned(PDispatcher) =
new result
result.selector = newSelector[AsyncData]()
result.timers.clear()
result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize)
var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
proc setGlobalDispatcher*(disp: owned PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
return disp.selector
proc register*(fd: AsyncFD) =
let p = getGlobalDispatcher()
var data = newAsyncData()
p.selector.registerHandle(fd.SocketHandle, {}, data)
proc unregister*(fd: AsyncFD) =
getGlobalDispatcher().selector.unregister(fd.SocketHandle)
proc unregister*(ev: AsyncEvent) =
getGlobalDispatcher().selector.unregister(SelectEvent(ev))
proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
return fd.SocketHandle in disp.selector
proc addRead*(fd: AsyncFD, cb: Callback) =
let p = getGlobalDispatcher()
var newEvents = {Event.Read}
withData(p.selector, fd.SocketHandle, adata) do:
adata.readList.add(cb)
newEvents.incl(Event.Read)
if len(adata.writeList) != 0: newEvents.incl(Event.Write)
do:
raise newException(ValueError, "File descriptor not registered.")
p.selector.updateHandle(fd.SocketHandle, newEvents)
proc addWrite*(fd: AsyncFD, cb: Callback) =
let p = getGlobalDispatcher()
var newEvents = {Event.Write}
withData(p.selector, fd.SocketHandle, adata) do:
adata.writeList.add(cb)
newEvents.incl(Event.Write)
if len(adata.readList) != 0: newEvents.incl(Event.Read)
do:
raise newException(ValueError, "File descriptor not registered.")
p.selector.updateHandle(fd.SocketHandle, newEvents)
proc hasPendingOperations*(): bool =
let p = getGlobalDispatcher()
not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
proc processBasicCallbacks(
fd: AsyncFD, event: Event
): tuple[readCbListCount, writeCbListCount: int] =
# Process pending descriptor and AsyncEvent callbacks.
#
# Invoke every callback stored in `rwlist`, until one
# returns `false` (which means callback wants to stay
# alive). In such case all remaining callbacks will be added
# to `rwlist` again, in the order they have been inserted.
#
# `rwlist` associated with file descriptor MUST BE emptied before
# dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
# or it can be possible to fall into endless cycle.
var curList: seq[Callback]
let selector = getGlobalDispatcher().selector
withData(selector, fd.int, fdData):
case event
of Event.Read:
shallowCopy(curList, fdData.readList)
fdData.readList = newSeqOfCap[Callback](InitCallbackListSize)
of Event.Write:
shallowCopy(curList, fdData.writeList)
fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize)
else:
assert false, "Cannot process callbacks for " & $event
let newLength = max(len(curList), InitCallbackListSize)
var newList = newSeqOfCap[Callback](newLength)
for cb in curList:
if not cb(fd):
# Callback wants to be called again.
newList.add(cb)
# This callback has returned with EAGAIN, so we don't need to
# call any other callbacks as they are all waiting for the same event
# on the same fd.
break
withData(selector, fd.int, fdData) do:
# Descriptor is still present in the queue.
case event
of Event.Read:
fdData.readList = newList & fdData.readList
of Event.Write:
fdData.writeList = newList & fdData.writeList
else:
assert false, "Cannot process callbacks for " & $event
result.readCbListCount = len(fdData.readList)
result.writeCbListCount = len(fdData.writeList)
do:
# Descriptor was unregistered in callback via `unregister()`.
result.readCbListCount = -1
result.writeCbListCount = -1
template processCustomCallbacks(ident: untyped) =
# Process pending custom event callbacks. Custom events are
# {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
# There can be only one callback registered with one descriptor,
# so there is no need to iterate over list.
var curList: seq[Callback]
withData(p.selector, ident.int, adata) do:
shallowCopy(curList, adata.readList)
adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
let newLength = len(curList)
var newList = newSeqOfCap[Callback](newLength)
var cb = curList[0]
if not cb(fd.AsyncFD):
newList.add(cb)
withData(p.selector, ident.int, adata) do:
# descriptor still present in queue.
adata.readList = newList & adata.readList
if len(adata.readList) == 0:
# if no callbacks registered with descriptor, unregister it.
p.selector.unregister(fd.int)
do:
# descriptor was unregistered in callback via `unregister()`.
discard
proc closeSocket*(sock: AsyncFD) =
let selector = getGlobalDispatcher().selector
if sock.SocketHandle notin selector:
raise newException(ValueError, "File descriptor not registered.")
let data = selector.getData(sock.SocketHandle)
sock.unregister()
sock.SocketHandle.close()
# We need to unblock the read and write callbacks which could still be
# waiting for the socket to become readable and/or writeable.
for cb in data.readList & data.writeList:
if not cb(sock):
raise newException(
ValueError, "Expecting async operations to stop when fd has closed."
)
proc runOnce(timeout = 500): bool =
let p = getGlobalDispatcher()
when ioselSupportedPlatform:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}
if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
raise newException(ValueError,
"No handles or timers registered in dispatcher.")
result = false
var keys: array[64, ReadyKey]
let nextTimer = processTimers(p, result)
var count =
p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys)
for i in 0..<count:
let fd = keys[i].fd.AsyncFD
let events = keys[i].events
var (readCbListCount, writeCbListCount) = (0, 0)
if Event.Read in events or events == {Event.Error}:
(readCbListCount, writeCbListCount) =
processBasicCallbacks(fd, Event.Read)
result = true
if Event.Write in events or events == {Event.Error}:
(readCbListCount, writeCbListCount) =
processBasicCallbacks(fd, Event.Write)
result = true
var isCustomEvent = false
if Event.User in events:
(readCbListCount, writeCbListCount) =
processBasicCallbacks(fd, Event.Read)
isCustomEvent = true
if readCbListCount == 0:
p.selector.unregister(fd.int)
result = true
when ioselSupportedPlatform:
if (customSet * events) != {}:
isCustomEvent = true
processCustomCallbacks(fd)
result = true
# because state `data` can be modified in callback we need to update
# descriptor events with currently registered callbacks.
if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1):
var newEvents: set[Event] = {}
if readCbListCount > 0: incl(newEvents, Event.Read)
if writeCbListCount > 0: incl(newEvents, Event.Write)
p.selector.updateHandle(SocketHandle(fd), newEvents)
# Timer processing.
discard processTimers(p, result)
# Callback queue processing
processPendingCallbacks(p, result)
proc recv*(socket: AsyncFD, size: int,
flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
var retFuture = newFuture[string]("recv")
var readBuffer = newString(size)
proc cb(sock: AsyncFD): bool =
result = true
let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
flags.toOSFlags())
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
if flags.isDisconnectionError(lastError):
retFuture.complete("")
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
elif res == 0:
# Disconnected
retFuture.complete("")
else:
readBuffer.setLen(res)
retFuture.complete(readBuffer)
# TODO: The following causes a massive slowdown.
#if not cb(socket):
addRead(socket, cb)
return retFuture
proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
var retFuture = newFuture[int]("recvInto")
proc cb(sock: AsyncFD): bool =
result = true
let res = recv(sock.SocketHandle, buf, size.cint,
flags.toOSFlags())
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
if flags.isDisconnectionError(lastError):
retFuture.complete(0)
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
else:
retFuture.complete(res)
# TODO: The following causes a massive slowdown.
#if not cb(socket):
addRead(socket, cb)
return retFuture
proc send*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
var retFuture = newFuture[void]("send")
var written = 0
proc cb(sock: AsyncFD): bool =
result = true
let netSize = size-written
var d = cast[cstring](buf)
let res = send(sock.SocketHandle, addr d[written], netSize.cint,
MSG_NOSIGNAL)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and
lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
if flags.isDisconnectionError(lastError):
retFuture.complete()
else:
retFuture.fail(newOSError(lastError))
else:
result = false # We still want this callback to be called.
else:
written.inc(res)
if res != netSize:
result = false # We still have data to send.
else:
retFuture.complete()
# TODO: The following causes crashes.
#if not cb(socket):
addWrite(socket, cb)
return retFuture
proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
saddrLen: SockLen,
flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
## Sends ``data`` of size ``size`` in bytes to specified destination
## (``saddr`` of size ``saddrLen`` in bytes, using socket ``socket``.
## The returned future will complete once all data has been sent.
var retFuture = newFuture[void]("sendTo")
# we will preserve address in our stack
var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
var stalen = saddrLen
zeroMem(addr(staddr[0]), 128)
copyMem(addr(staddr[0]), saddr, saddrLen)
proc cb(sock: AsyncFD): bool =
result = true
let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
cast[ptr SockAddr](addr(staddr[0])), stalen)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
else:
retFuture.complete()
addWrite(socket, cb)
return retFuture
proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
saddr: ptr SockAddr, saddrLen: ptr SockLen,
flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
## Receives a datagram data from ``socket`` into ``data``, which must
## be at least of size ``size`` in bytes, address of datagram's sender
## will be stored into ``saddr`` and ``saddrLen``. Returned future will
## complete once one datagram has been received, and will return size
## of packet received.
var retFuture = newFuture[int]("recvFromInto")
proc cb(sock: AsyncFD): bool =
result = true
let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
saddr, saddrLen)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false
else:
retFuture.complete(res)
addRead(socket, cb)
return retFuture
proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
owned(Future[tuple[address: string, client: AsyncFD]]) =
var retFuture = newFuture[tuple[address: string,
client: AsyncFD]]("acceptAddr")
proc cb(sock: AsyncFD): bool =
result = true
var sockAddress: Sockaddr_storage
var addrLen = sizeof(sockAddress).SockLen
var client = accept(sock.SocketHandle,
cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
if client == osInvalidSocket:
let lastError = osLastError()
assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN
if lastError.int32 == EINTR:
return false
else:
if flags.isDisconnectionError(lastError):
return false
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
try:
let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
register(client.AsyncFD)
retFuture.complete((address, client.AsyncFD))
except:
# getAddrString may raise
client.close()
retFuture.fail(getCurrentException())
addRead(socket, cb)
return retFuture
when ioselSupportedPlatform:
proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
## Start watching for timeout expiration, and then call the
## callback ``cb``.
## ``timeout`` - time in milliseconds,
## ``oneshot`` - if ``true`` only one event will be dispatched,
## if ``false`` continuous events every ``timeout`` milliseconds.
let p = getGlobalDispatcher()
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerTimer(timeout, oneshot, data)
proc addSignal*(signal: int, cb: Callback) =
## Start watching signal ``signal``, and when signal appears, call the
## callback ``cb``.
let p = getGlobalDispatcher()
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerSignal(signal, data)
proc addProcess*(pid: int, cb: Callback) =
## Start watching for process exit with pid ``pid``, and then call
## the callback ``cb``.
let p = getGlobalDispatcher()
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerProcess(pid, data)
proc newAsyncEvent*(): AsyncEvent =
## Creates new ``AsyncEvent``.
result = AsyncEvent(newSelectEvent())
proc trigger*(ev: AsyncEvent) =
## Sets new ``AsyncEvent`` to signaled state.
trigger(SelectEvent(ev))
proc close*(ev: AsyncEvent) =
## Closes ``AsyncEvent``
close(SelectEvent(ev))
proc addEvent*(ev: AsyncEvent, cb: Callback) =
## Start watching for event ``ev``, and call callback ``cb``, when
## ev will be set to signaled state.
let p = getGlobalDispatcher()
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerEvent(SelectEvent(ev), data)
proc drain*(timeout = 500) =
## Waits for completion events and processes them. Raises ``ValueError``
## if there are no pending operations. In contrast to ``poll`` this
## processes as many events as are available.
if runOnce(timeout) or hasPendingOperations():
while hasPendingOperations() and runOnce(timeout): discard
proc poll*(timeout = 500) =
## Waits for completion events and processes them. Raises ``ValueError``
## if there are no pending operations. This runs the underlying OS
## `epoll`:idx: or `kqueue`:idx: primitive only once.
discard runOnce(timeout)
template createAsyncNativeSocketImpl(domain, sockType, protocol) =
let handle = createNativeSocket(domain, sockType, protocol)
if handle == osInvalidSocket:
return osInvalidSocket.AsyncFD
handle.setBlocking(false)
when defined(macosx) and not defined(nimdoc):
handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
result = handle.AsyncFD
register(result)
proc createAsyncNativeSocket*(domain: cint, sockType: cint,
protocol: cint): AsyncFD =
createAsyncNativeSocketImpl(domain, sockType, protocol)
proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
sockType: SockType = SOCK_STREAM,
protocol: Protocol = IPPROTO_TCP): AsyncFD =
createAsyncNativeSocketImpl(domain, sockType, protocol)
proc newAsyncNativeSocket*(domain: cint, sockType: cint,
protocol: cint): AsyncFD {.deprecated: "use createAsyncNativeSocket instead".} =
createAsyncNativeSocketImpl(domain, sockType, protocol)
proc newAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
sockType: SockType = SOCK_STREAM,
protocol: Protocol = IPPROTO_TCP): AsyncFD
{.deprecated: "use createAsyncNativeSocket instead".} =
createAsyncNativeSocketImpl(domain, sockType, protocol)
when defined(windows) or defined(nimdoc):
proc bindToDomain(handle: SocketHandle, domain: Domain) =
# Extracted into a separate proc, because connect() on Windows requires
# the socket to be initially bound.
template doBind(saddr) =
if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
sizeof(saddr).SockLen) < 0'i32:
raiseOSError(osLastError())
if domain == Domain.AF_INET6:
var saddr: Sockaddr_in6
saddr.sin6_family = uint16(toInt(domain))
doBind(saddr)
else:
var saddr: Sockaddr_in
saddr.sin_family = uint16(toInt(domain))
doBind(saddr)
proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
let retFuture = newFuture[void]("doConnect")
result = retFuture
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
)
let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
cint(addrInfo.ai_addrlen), nil, 0, nil,
cast[POVERLAPPED](ol))
if ret:
# Request to connect completed immediately.
retFuture.complete()
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it
# will free ``ol``.
else:
let lastError = osLastError()
if lastError.int32 != ERROR_IO_PENDING:
# With ERROR_IO_PENDING ``ol`` will be deallocated in ``poll``,
# and the future will be completed/failed there, too.
GC_unref(ol)
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
let retFuture = newFuture[void]("doConnect")
result = retFuture
proc cb(fd: AsyncFD): bool =
let ret = SocketHandle(fd).getSockOptInt(
cint(SOL_SOCKET), cint(SO_ERROR))
if ret == 0:
# We have connected.
retFuture.complete()
return true
elif ret == EINTR:
# interrupted, keep waiting
return false
else:
retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
return true
let ret = connect(socket.SocketHandle,
addrInfo.ai_addr,
addrInfo.ai_addrlen.SockLen)
if ret == 0:
# Request to connect completed immediately.
retFuture.complete()
else:
let lastError = osLastError()
if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
addWrite(socket, cb)
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
protocol: Protocol = IPPROTO_RAW) =
## Iterates through the AddrInfo linked list asynchronously
## until the connection can be established.
const shouldCreateFd = not declared(fd)
when shouldCreateFd:
let sockType = protocol.toSockType()
var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
for i in low(fdPerDomain)..high(fdPerDomain):
fdPerDomain[i] = osInvalidSocket.AsyncFD
template closeUnusedFds(domainToKeep = -1) {.dirty.} =
for i, fd in fdPerDomain:
if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
fd.closeSocket()
var lastException: ref Exception
var curAddrInfo = addrInfo
var domain: Domain
when shouldCreateFd:
var curFd: AsyncFD
else:
var curFd = fd
proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
if fut == nil or fut.failed:
if fut != nil:
lastException = fut.readError()
while curAddrInfo != nil:
let domainOpt = curAddrInfo.ai_family.toKnownDomain()
if domainOpt.isSome:
domain = domainOpt.unsafeGet()
break
curAddrInfo = curAddrInfo.ai_next
if curAddrInfo == nil:
freeaddrinfo(addrInfo)
when shouldCreateFd:
closeUnusedFds()
if lastException != nil:
retFuture.fail(lastException)
else:
retFuture.fail(newException(
IOError, "Couldn't resolve address: " & address))
return
when shouldCreateFd:
curFd = fdPerDomain[ord(domain)]
if curFd == osInvalidSocket.AsyncFD:
try:
curFd = createAsyncNativeSocket(domain, sockType, protocol)
except:
freeaddrinfo(addrInfo)
closeUnusedFds()
raise getCurrentException()
when defined(windows):
curFd.SocketHandle.bindToDomain(domain)
fdPerDomain[ord(domain)] = curFd
doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
curAddrInfo = curAddrInfo.ai_next
else:
freeaddrinfo(addrInfo)
when shouldCreateFd:
closeUnusedFds(ord(domain))
retFuture.complete(curFd)
else:
retFuture.complete()
tryNextAddrInfo(nil)
proc dial*(address: string, port: Port,
protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) =
## Establishes connection to the specified ``address``:``port`` pair via the
## specified protocol. The procedure iterates through possible
## resolutions of the ``address`` until it succeeds, meaning that it
## seamlessly works with both IPv4 and IPv6.
## Returns the async file descriptor, registered in the dispatcher of
## the current thread, ready to send or receive data.
let retFuture = newFuture[AsyncFD]("dial")
result = retFuture
let sockType = protocol.toSockType()
let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
asyncAddrInfoLoop(aiList, noFD, protocol)
proc connect*(socket: AsyncFD, address: string, port: Port,
domain = Domain.AF_INET): owned(Future[void]) =
let retFuture = newFuture[void]("connect")
result = retFuture
when defined(windows):
verifyPresence(socket)
else:
assert getSockDomain(socket.SocketHandle) == domain
let aiList = getAddrInfo(address, port, domain)
when defined(windows):
socket.SocketHandle.bindToDomain(domain)
asyncAddrInfoLoop(aiList, socket)
proc sleepAsync*(ms: int | float): owned(Future[void]) =
## Suspends the execution of the current async procedure for the next
## ``ms`` milliseconds.
var retFuture = newFuture[void]("sleepAsync")
let p = getGlobalDispatcher()
when ms is int:
p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture))
elif ms is float:
let ns = (ms * 1_000_000).int64
p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture))
return retFuture
proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) =
## Returns a future which will complete once ``fut`` completes or after
## ``timeout`` milliseconds has elapsed.
##
## If ``fut`` completes first the returned future will hold true,
## otherwise, if ``timeout`` milliseconds has elapsed first, the returned
## future will hold false.
var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
var timeoutFuture = sleepAsync(timeout)
fut.callback =
proc () =
if not retFuture.finished:
if fut.failed:
retFuture.fail(fut.error)
else:
retFuture.complete(true)
timeoutFuture.callback =
proc () =
if not retFuture.finished: retFuture.complete(false)
return retFuture
proc accept*(socket: AsyncFD,
flags = {SocketFlag.SafeDisconn}): owned(Future[AsyncFD]) =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection.
## The future will complete when the connection is successfully accepted.
var retFut = newFuture[AsyncFD]("accept")
var fut = acceptAddr(socket, flags)
fut.callback =
proc (future: Future[tuple[address: string, client: AsyncFD]]) =
assert future.finished
if future.failed:
retFut.fail(future.error)
else:
retFut.complete(future.read.client)
return retFut
proc keepAlive(x: string) =
discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive"
proc send*(socket: AsyncFD, data: string,
flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
## Sends ``data`` to ``socket``. The returned future will complete once all
## data has been sent.
var retFuture = newFuture[void]("send")
if data.len > 0:
let sendFut = socket.send(unsafeAddr data[0], data.len, flags)
sendFut.callback =
proc () =
keepAlive(data)
if sendFut.failed:
retFuture.fail(sendFut.error)
else:
retFuture.complete()
else:
retFuture.complete()
return retFuture
# -- Await Macro
include asyncmacro
proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} =
## Returns a future that will complete when all the string data from the
## specified future stream is retrieved.
result = ""
while true:
let (hasValue, value) = await future.read()
if hasValue:
result.add(value)
else:
break
proc callSoon(cbproc: proc () {.gcsafe.}) =
getGlobalDispatcher().callbacks.addLast(cbproc)
proc runForever*() =
## Begins a never ending global dispatcher poll loop.
while true:
poll()
proc waitFor*[T](fut: Future[T]): T =
## **Blocks** the current thread until the specified future completes.
while not fut.finished:
poll()
fut.read