diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 604 |
1 files changed, 374 insertions, 230 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 0ea8ef43b..073cd3576 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -1,6 +1,6 @@ # # -# Nimrod's Runtime Library +# Nim's Runtime Library # (c) Copyright 2014 Dominik Picheta # # See the file "copying.txt", included in this @@ -13,45 +13,143 @@ import os, oids, tables, strutils, macros, times import rawsockets, net -export TPort, TSocketFlags +export Port, SocketFlag #{.injectStmt: newGcInvariant().} ## AsyncDispatch -## ------------- +## ************* ## -## This module implements a brand new dispatcher based on Futures. -## On Windows IOCP is used and on other operating systems the ``selectors`` -## module is used instead. +## This module implements asynchronous IO. This includes a dispatcher, +## a ``Future`` type implementation, and an ``async`` macro which allows +## asynchronous code to be written in a synchronous style with the ``await`` +## keyword. ## -## **Note:** This module is still largely experimental. +## The dispatcher acts as a kind of event loop. You must call ``poll`` on it +## (or a function which does so for you such as ``waitFor`` or ``runForever``) +## in order to poll for any outstanding events. The underlying implementation +## is based on epoll on Linux, IO Completion Ports on Windows and select on +## other operating systems. +## +## The ``poll`` function will not, on its own, return any events. Instead +## an appropriate ``Future`` object will be completed. A ``Future`` is a +## type which holds a value which is not yet available, but which *may* be +## available in the future. You can check whether a future is finished +## by using the ``finished`` function. When a future is finished it means that +## either the value that it holds is now available or it holds an error instead. +## The latter situation occurs when the operation to complete a future fails +## with an exception. You can distinguish between the two situations with the +## ``failed`` function. +## +## Future objects can also store a callback procedure which will be called +## automatically once the future completes. +## +## Futures therefore can be thought of as an implementation of the proactor +## pattern. In this +## pattern you make a request for an action, and once that action is fulfilled +## a future is completed with the result of that action. Requests can be +## made by calling the appropriate functions. For example: calling the ``recv`` +## function will create a request for some data to be read from a socket. The +## future which the ``recv`` function returns will then complete once the +## requested amount of data is read **or** an exception occurs. +## +## Code to read some data from a socket may look something like this: +## +## .. code-block::nim +## var future = socket.recv(100) +## future.callback = +## proc () = +## echo(future.read) +## +## All asynchronous functions returning a ``Future`` will not block. They +## will not however return immediately. An asynchronous function will have +## code which will be executed before an asynchronous request is made, in most +## cases this code sets up the request. +## +## In the above example, the ``recv`` function will return a brand new +## ``Future`` instance once the request for data to be read from the socket +## is made. This ``Future`` instance will complete once the requested amount +## of data is read, in this case it is 100 bytes. The second line sets a +## callback on this future which will be called once the future completes. +## All the callback does is write the data stored in the future to ``stdout``. +## The ``read`` function is used for this and it checks whether the future +## completes with an error for you (if it did it will simply raise the +## error), if there is no error however it returns the value of the future. +## +## Asynchronous procedures +## ----------------------- +## +## Asynchronous procedures remove the pain of working with callbacks. They do +## this by allowing you to write asynchronous code the same way as you would +## write synchronous code. +## +## An asynchronous procedure is marked using the ``{.async.}`` pragma. +## When marking a procedure with the ``{.async.}`` pragma it must have a +## ``Future[T]`` return type or no return type at all. If you do not specify +## a return type then ``Future[void]`` is assumed. +## +## Inside asynchronous procedures ``await`` can be used to call any +## procedures which return a +## ``Future``; this includes asynchronous procedures. When a procedure is +## "awaited", the asynchronous procedure it is awaited in will +## suspend its execution +## until the awaited procedure's Future completes. At which point the +## asynchronous procedure will resume its execution. During the period +## when an asynchronous procedure is suspended other asynchronous procedures +## will be run by the dispatcher. +## +## The ``await`` call may be used in many contexts. It can be used on the right +## hand side of a variable declaration: ``var data = await socket.recv(100)``, +## in which case the variable will be set to the value of the future +## automatically. It can be used to await a ``Future`` object, and it can +## be used to await a procedure returning a ``Future[void]``: +## ``await socket.send("foobar")``. +## +## Discarding futures +## ------------------ +## +## Futures should **never** be discarded. This is because they may contain +## errors. If you do not care for the result of a Future then you should +## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. +## +## Examples +## -------- +## +## For examples take a look at the documentation for the modules implementing +## asynchronous IO. A good place to start is the +## `asyncnet module <asyncnet.html>`_. +## +## Limitations/Bugs +## ---------------- +## +## * ``except`` statement (without `try`) does not work inside async procedures. +## * The effect system (``raises: []``) does not work with async procedures. +## * Can't await in a ``except`` body -# TODO: ``except`` statement (without `try`) does not work. -# TODO: Multiple exception names in a ``except`` don't work. -# TODO: The effect system (raises: []) has trouble with my try transformation. -# TODO: Can't await in a 'except' body -# TODO: getCurrentException(Msg) don't work # TODO: Check if yielded future is nil and throw a more meaningful exception # -- Futures type - PFutureBase* = ref object of PObject + FutureBase* = ref object of RootObj ## Untyped future. cb: proc () {.closure,gcsafe.} finished: bool - error*: ref EBase + error*: ref Exception ## Stored exception errorStackTrace*: string when not defined(release): stackTrace: string ## For debugging purposes only. id: int fromProc: string - PFuture*[T] = ref object of PFutureBase - value: T + Future*[T] = ref object of FutureBase ## Typed future. + value: T ## Stored value + +{.deprecated: [PFutureBase: FutureBase, PFuture: Future].} -var currentID* = 0 -proc newFuture*[T](fromProc: string = "unspecified"): PFuture[T] = + +var currentID = 0 +proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = ## Creates a new future. ## ## Specifying ``fromProc``, which is a string specifying the name of the proc @@ -64,7 +162,7 @@ proc newFuture*[T](fromProc: string = "unspecified"): PFuture[T] = result.fromProc = fromProc currentID.inc() -proc checkFinished[T](future: PFuture[T]) = +proc checkFinished[T](future: Future[T]) = when not defined(release): if future.finished: echo("<-----> ", future.id, " ", future.fromProc) @@ -77,7 +175,7 @@ proc checkFinished[T](future: PFuture[T]) = echo getStackTrace() assert false -proc complete*[T](future: PFuture[T], val: T) = +proc complete*[T](future: Future[T], val: T) = ## Completes ``future`` with value ``val``. #assert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) @@ -87,7 +185,7 @@ proc complete*[T](future: PFuture[T], val: T) = if future.cb != nil: future.cb() -proc complete*(future: PFuture[void]) = +proc complete*(future: Future[void]) = ## Completes a void ``future``. #assert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) @@ -96,7 +194,7 @@ proc complete*(future: PFuture[void]) = if future.cb != nil: future.cb() -proc fail*[T](future: PFuture[T], error: ref EBase) = +proc fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. #assert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) @@ -112,8 +210,9 @@ proc fail*[T](future: PFuture[T], error: ref EBase) = # TODO: This may turn out to be a bad idea. # Turns out this is a bad idea. #raise error + discard -proc `callback=`*(future: PFutureBase, cb: proc () {.closure,gcsafe.}) = +proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) = ## Sets the callback proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. @@ -124,23 +223,24 @@ proc `callback=`*(future: PFutureBase, cb: proc () {.closure,gcsafe.}) = if future.finished: future.cb() -proc `callback=`*[T](future: PFuture[T], - cb: proc (future: PFuture[T]) {.closure,gcsafe.}) = +proc `callback=`*[T](future: Future[T], + cb: proc (future: Future[T]) {.closure,gcsafe.}) = ## Sets the callback proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. future.callback = proc () = cb(future) -proc echoOriginalStackTrace[T](future: PFuture[T]) = +proc echoOriginalStackTrace[T](future: Future[T]) = # TODO: Come up with something better. when not defined(release): echo("Original stack trace in ", future.fromProc, ":") - if not future.errorStackTrace.isNil() and future.errorStackTrace != "": + if not future.errorStackTrace.isNil and future.errorStackTrace != "": echo(future.errorStackTrace) else: echo("Empty or nil stack trace.") + echo("Continuing...") -proc read*[T](future: PFuture[T]): T = +proc read*[T](future: Future[T]): T = ## Retrieves the value of ``future``. Future must be finished otherwise ## this function will fail with a ``EInvalidValue`` exception. ## @@ -153,24 +253,28 @@ proc read*[T](future: PFuture[T]): T = return future.value else: # TODO: Make a custom exception type for this? - raise newException(EInvalidValue, "Future still in progress.") + raise newException(ValueError, "Future still in progress.") -proc readError*[T](future: PFuture[T]): ref EBase = +proc readError*[T](future: Future[T]): ref Exception = + ## Retrieves the exception stored in ``future``. + ## + ## An ``ValueError`` exception will be thrown if no exception exists + ## in the specified Future. if future.error != nil: return future.error else: - raise newException(EInvalidValue, "No error in future.") + raise newException(ValueError, "No error in future.") -proc finished*[T](future: PFuture[T]): bool = +proc finished*[T](future: Future[T]): bool = ## Determines whether ``future`` has completed. ## ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. future.finished -proc failed*(future: PFutureBase): bool = +proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. - future.error != nil + return future.error != nil -proc asyncCheck*[T](future: PFuture[T]) = +proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. ## @@ -181,7 +285,7 @@ proc asyncCheck*[T](future: PFuture[T]) = echoOriginalStackTrace(future) raise future.error -proc `and`*[T, Y](fut1: PFuture[T], fut2: PFuture[Y]): PFuture[void] = +proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = ## Returns a future which will complete once both ``fut1`` and ``fut2`` ## complete. var retFuture = newFuture[void]("asyncdispatch.`and`") @@ -193,7 +297,7 @@ proc `and`*[T, Y](fut1: PFuture[T], fut2: PFuture[Y]): PFuture[void] = if fut1.finished: retFuture.complete() return retFuture -proc `or`*[T, Y](fut1: PFuture[T], fut2: PFuture[Y]): PFuture[void] = +proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = ## Returns a future which will complete once either ``fut1`` or ``fut2`` ## complete. var retFuture = newFuture[void]("asyncdispatch.`or`") @@ -204,8 +308,8 @@ proc `or`*[T, Y](fut1: PFuture[T], fut2: PFuture[Y]): PFuture[void] = return retFuture type - PDispatcherBase = ref object of PObject - timers: seq[tuple[finishAt: float, fut: PFuture[void]]] + PDispatcherBase = ref object of RootRef + timers: seq[tuple[finishAt: float, fut: Future[void]]] proc processTimers(p: PDispatcherBase) = var oldTimers = p.timers @@ -219,21 +323,21 @@ proc processTimers(p: PDispatcherBase) = when defined(windows) or defined(nimdoc): import winlean, sets, hashes type - TCompletionKey = dword + TCompletionKey = Dword TCompletionData* = object - sock: TAsyncFD - cb: proc (sock: TAsyncFD, bytesTransferred: DWORD, - errcode: TOSErrorCode) {.closure,gcsafe.} + fd*: TAsyncFD # TODO: Rename this. + cb*: proc (fd: TAsyncFD, bytesTransferred: Dword, + errcode: OSErrorCode) {.closure,gcsafe.} PDispatcher* = ref object of PDispatcherBase ioPort: THandle - handles: TSet[TAsyncFD] + handles: HashSet[TAsyncFD] TCustomOverlapped = object of TOVERLAPPED data*: TCompletionData - PCustomOverlapped = ref TCustomOverlapped + PCustomOverlapped* = ref TCustomOverlapped TAsyncFD* = distinct int @@ -243,7 +347,7 @@ when defined(windows) or defined(nimdoc): proc newDispatcher*(): PDispatcher = ## Creates a new Dispatcher instance. new result - result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) + result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[TAsyncFD]() result.timers = @[] @@ -253,19 +357,19 @@ when defined(windows) or defined(nimdoc): if gDisp.isNil: gDisp = newDispatcher() result = gDisp - proc register*(sock: TAsyncFD) = - ## Registers ``sock`` with the dispatcher. + proc register*(fd: TAsyncFD) = + ## Registers ``fd`` with the dispatcher. let p = getGlobalDispatcher() - if CreateIOCompletionPort(sock.THandle, p.ioPort, - cast[TCompletionKey](sock), 1) == 0: - osError(osLastError()) - p.handles.incl(sock) + if createIoCompletionPort(fd.THandle, p.ioPort, + cast[TCompletionKey](fd), 1) == 0: + raiseOSError(osLastError()) + p.handles.incl(fd) - proc verifyPresence(sock: TAsyncFD) = - ## Ensures that socket has been registered with the dispatcher. + proc verifyPresence(fd: TAsyncFD) = + ## Ensures that file descriptor has been registered with the dispatcher. let p = getGlobalDispatcher() - if sock notin p.handles: - raise newException(EInvalidValue, + if fd notin p.handles: + raise newException(ValueError, "Operation performed on a socket which has not been registered with" & " the dispatcher yet.") @@ -273,40 +377,40 @@ when defined(windows) or defined(nimdoc): ## Waits for completion events and processes them. let p = getGlobalDispatcher() if p.handles.len == 0 and p.timers.len == 0: - raise newException(EInvalidValue, + raise newException(ValueError, "No handles or timers registered in dispatcher.") let llTimeout = if timeout == -1: winlean.INFINITE else: timeout.int32 - var lpNumberOfBytesTransferred: DWORD + var lpNumberOfBytesTransferred: Dword var lpCompletionKey: ULONG var customOverlapped: PCustomOverlapped - let res = GetQueuedCompletionStatus(p.ioPort, + let res = getQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred, addr lpCompletionKey, - cast[ptr POverlapped](addr customOverlapped), llTimeout).bool + cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool # http://stackoverflow.com/a/12277264/492186 # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html if res: # This is useful for ensuring the reliability of the overlapped struct. - assert customOverlapped.data.sock == lpCompletionKey.TAsyncFD + assert customOverlapped.data.fd == lpCompletionKey.TAsyncFD - customOverlapped.data.cb(customOverlapped.data.sock, - lpNumberOfBytesTransferred, TOSErrorCode(-1)) + customOverlapped.data.cb(customOverlapped.data.fd, + lpNumberOfBytesTransferred, OSErrorCode(-1)) GC_unref(customOverlapped) else: let errCode = osLastError() if customOverlapped != nil: - assert customOverlapped.data.sock == lpCompletionKey.TAsyncFD - customOverlapped.data.cb(customOverlapped.data.sock, + assert customOverlapped.data.fd == lpCompletionKey.TAsyncFD + customOverlapped.data.cb(customOverlapped.data.fd, lpNumberOfBytesTransferred, errCode) GC_unref(customOverlapped) else: if errCode.int32 == WAIT_TIMEOUT: # Timed out discard - else: osError(errCode) + else: raiseOSError(errCode) # Timer processing. processTimers(p) @@ -315,72 +419,72 @@ when defined(windows) or defined(nimdoc): var acceptExPtr: pointer = nil var getAcceptExSockAddrsPtr: pointer = nil - proc initPointer(s: TSocketHandle, func: var pointer, guid: var TGUID): bool = + proc initPointer(s: SocketHandle, func: var pointer, guid: var TGUID): bool = # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c - var bytesRet: DWord + var bytesRet: Dword func = nil result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, - sizeof(TGUID).dword, addr func, sizeof(pointer).DWORD, + sizeof(TGUID).Dword, addr func, sizeof(pointer).Dword, addr bytesRet, nil, nil) == 0 proc initAll() = let dummySock = newRawSocket() if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX): - osError(osLastError()) + raiseOSError(osLastError()) if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX): - osError(osLastError()) + raiseOSError(osLastError()) if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS): - osError(osLastError()) + raiseOSError(osLastError()) - proc connectEx(s: TSocketHandle, name: ptr TSockAddr, namelen: cint, - lpSendBuffer: pointer, dwSendDataLength: dword, - lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool = - if connectExPtr.isNil: raise newException(EInvalidValue, "Need to initialise ConnectEx().") + proc connectEx(s: SocketHandle, name: ptr TSockAddr, namelen: cint, + lpSendBuffer: pointer, dwSendDataLength: Dword, + lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool = + if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().") let func = - cast[proc (s: TSocketHandle, name: ptr TSockAddr, namelen: cint, - lpSendBuffer: pointer, dwSendDataLength: dword, - lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool {.stdcall,gcsafe.}](connectExPtr) + cast[proc (s: SocketHandle, name: ptr TSockAddr, namelen: cint, + lpSendBuffer: pointer, dwSendDataLength: Dword, + lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr) result = func(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, lpOverlapped) - proc acceptEx(listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer, + proc acceptEx(listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer, dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD, - lpOverlapped: POverlapped): bool = - if acceptExPtr.isNil: raise newException(EInvalidValue, "Need to initialise AcceptEx().") + dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword, + lpOverlapped: POVERLAPPED): bool = + if acceptExPtr.isNil: raise newException(ValueError, "Need to initialise AcceptEx().") let func = - cast[proc (listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer, + cast[proc (listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer, dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD, - lpOverlapped: POverlapped): bool {.stdcall,gcsafe.}](acceptExPtr) + dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword, + lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](acceptExPtr) result = func(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, lpOverlapped) proc getAcceptExSockaddrs(lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: DWORD, - LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: lpint, - RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: lpint) = + dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword, + LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: LPInt, + RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: LPInt) = if getAcceptExSockAddrsPtr.isNil: - raise newException(EInvalidValue, "Need to initialise getAcceptExSockAddrs().") + raise newException(ValueError, "Need to initialise getAcceptExSockAddrs().") let func = cast[proc (lpOutputBuffer: pointer, dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: DWORD, LocalSockaddr: ptr ptr TSockAddr, - LocalSockaddrLength: lpint, RemoteSockaddr: ptr ptr TSockAddr, - RemoteSockaddrLength: lpint) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr) + dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr TSockAddr, + LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr TSockAddr, + RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr) func(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, RemoteSockaddr, RemoteSockaddrLength) - proc connect*(socket: TAsyncFD, address: string, port: TPort, - af = AF_INET): PFuture[void] = + proc connect*(socket: TAsyncFD, address: string, port: Port, + af = AF_INET): Future[void] = ## Connects ``socket`` to server at ``address:port``. ## - ## Returns a ``PFuture`` which will complete when the connection succeeds + ## Returns a ``Future`` which will complete when the connection succeeds ## or an error occurs. verifyPresence(socket) var retFuture = newFuture[void]("connect") @@ -389,31 +493,31 @@ when defined(windows) or defined(nimdoc): saddr.sin_family = int16(toInt(af)) saddr.sin_port = 0 saddr.sin_addr.s_addr = INADDR_ANY - if bindAddr(socket.TSocketHandle, cast[ptr TSockAddr](addr(saddr)), + if bindAddr(socket.SocketHandle, cast[ptr TSockAddr](addr(saddr)), sizeof(saddr).TSockLen) < 0'i32: - osError(osLastError()) + raiseOSError(osLastError()) var aiList = getAddrInfo(address, port, af) var success = false - var lastError: TOSErrorCode + var lastError: OSErrorCode var it = aiList while it != nil: # "the OVERLAPPED structure must remain valid until the I/O completes" # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx var ol = PCustomOverlapped() GC_ref(ol) - ol.data = TCompletionData(sock: socket, cb: - proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) = + ol.data = TCompletionData(fd: socket, cb: + proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) = if not retFuture.finished: - if errcode == TOSErrorCode(-1): + if errcode == OSErrorCode(-1): retFuture.complete() else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) + retFuture.fail(newException(OSError, osErrorMsg(errcode))) ) - var ret = connectEx(socket.TSocketHandle, it.ai_addr, - sizeof(TSockAddrIn).cint, nil, 0, nil, - cast[POverlapped](ol)) + var ret = connectEx(socket.SocketHandle, it.ai_addr, + sizeof(Tsockaddr_in).cint, nil, 0, nil, + cast[POVERLAPPED](ol)) if ret: # Request to connect completed immediately. success = true @@ -435,15 +539,17 @@ when defined(windows) or defined(nimdoc): dealloc(aiList) if not success: - retFuture.fail(newException(EOS, osErrorMsg(lastError))) + retFuture.fail(newException(OSError, osErrorMsg(lastError))) return retFuture proc recv*(socket: TAsyncFD, size: int, - flags = {TSocketFlags.SafeDisconn}): PFuture[string] = + flags = {SocketFlag.SafeDisconn}): Future[string] = ## Reads **up to** ``size`` bytes from ``socket``. Returned future will ## complete once all the data requested is read, a part of the data has been ## read, or the socket has disconnected in which case the future will ## complete with a value of ``""``. + ## + ## **Warning**: The ``Peek`` socket flag is not supported on Windows. # Things to note: @@ -453,19 +559,21 @@ when defined(windows) or defined(nimdoc): # '\0' in the message currently signifies a socket disconnect. Who # knows what will happen when someone sends that to our socket. verifyPresence(socket) + assert SocketFlag.Peek notin flags, "Peek not supported on Windows." + var retFuture = newFuture[string]("recv") var dataBuf: TWSABuf dataBuf.buf = cast[cstring](alloc0(size)) dataBuf.len = size - var bytesReceived: DWord - var flagsio = flags.toOSFlags().DWord + var bytesReceived: Dword + var flagsio = flags.toOSFlags().Dword var ol = PCustomOverlapped() GC_ref(ol) - ol.data = TCompletionData(sock: socket, cb: - proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) = + ol.data = TCompletionData(fd: socket, cb: + proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) = if not retFuture.finished: - if errcode == TOSErrorCode(-1): + if errcode == OSErrorCode(-1): if bytesCount == 0 and dataBuf.buf[0] == '\0': retFuture.complete("") else: @@ -477,14 +585,14 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(errcode): retFuture.complete("") else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) + retFuture.fail(newException(OSError, osErrorMsg(errcode))) if dataBuf.buf != nil: dealloc dataBuf.buf dataBuf.buf = nil ) - let ret = WSARecv(socket.TSocketHandle, addr dataBuf, 1, addr bytesReceived, - addr flagsio, cast[POverlapped](ol), nil) + let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, + addr flagsio, cast[POVERLAPPED](ol), nil) if ret == -1: let err = osLastError() if err.int32 != ERROR_IO_PENDING: @@ -495,7 +603,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete("") else: - retFuture.fail(newException(EOS, osErrorMsg(err))) + retFuture.fail(newException(OSError, osErrorMsg(err))) elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': # We have to ensure that the buffer is empty because WSARecv will tell # us immediatelly when it was disconnected, even when there is still @@ -527,7 +635,7 @@ when defined(windows) or defined(nimdoc): return retFuture proc send*(socket: TAsyncFD, data: string, - flags = {TSocketFlags.SafeDisconn}): PFuture[void] = + flags = {SocketFlag.SafeDisconn}): Future[void] = ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. verifyPresence(socket) @@ -537,23 +645,23 @@ when defined(windows) or defined(nimdoc): dataBuf.buf = data # since this is not used in a callback, this is fine dataBuf.len = data.len - var bytesReceived, lowFlags: DWord + var bytesReceived, lowFlags: Dword var ol = PCustomOverlapped() GC_ref(ol) - ol.data = TCompletionData(sock: socket, cb: - proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) = + ol.data = TCompletionData(fd: socket, cb: + proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) = if not retFuture.finished: - if errcode == TOSErrorCode(-1): + if errcode == OSErrorCode(-1): retFuture.complete() else: if flags.isDisconnectionError(errcode): retFuture.complete() else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) + retFuture.fail(newException(OSError, osErrorMsg(errcode))) ) - let ret = WSASend(socket.TSocketHandle, addr dataBuf, 1, addr bytesReceived, - lowFlags, cast[POverlapped](ol), nil) + let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, + lowFlags, cast[POVERLAPPED](ol), nil) if ret == -1: let err = osLastError() if err.int32 != ERROR_IO_PENDING: @@ -561,7 +669,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete() else: - retFuture.fail(newException(EOS, osErrorMsg(err))) + retFuture.fail(newException(OSError, osErrorMsg(err))) else: retFuture.complete() # We don't deallocate ``ol`` here because even though this completed @@ -569,8 +677,8 @@ when defined(windows) or defined(nimdoc): # free ``ol``. return retFuture - proc acceptAddr*(socket: TAsyncFD, flags = {TSocketFlags.SafeDisconn}): - PFuture[tuple[address: string, client: TAsyncFD]] = + proc acceptAddr*(socket: TAsyncFD, flags = {SocketFlag.SafeDisconn}): + Future[tuple[address: string, client: TAsyncFD]] = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. @@ -586,28 +694,28 @@ when defined(windows) or defined(nimdoc): var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]("acceptAddr") var clientSock = newRawSocket() - if clientSock == osInvalidSocket: osError(osLastError()) + if clientSock == osInvalidSocket: raiseOSError(osLastError()) const lpOutputLen = 1024 var lpOutputBuf = newString(lpOutputLen) - var dwBytesReceived: DWORD - let dwReceiveDataLength = 0.DWORD # We don't want any data to be read. - let dwLocalAddressLength = DWORD(sizeof (TSockaddr_in) + 16) - let dwRemoteAddressLength = DWORD(sizeof(TSockaddr_in) + 16) + var dwBytesReceived: Dword + let dwReceiveDataLength = 0.Dword # We don't want any data to be read. + let dwLocalAddressLength = Dword(sizeof (Tsockaddr_in) + 16) + let dwRemoteAddressLength = Dword(sizeof(Tsockaddr_in) + 16) template completeAccept(): stmt {.immediate, dirty.} = var listenSock = socket let setoptRet = setsockopt(clientSock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, addr listenSock, sizeof(listenSock).TSockLen) - if setoptRet != 0: osError(osLastError()) + if setoptRet != 0: raiseOSError(osLastError()) - var LocalSockaddr, RemoteSockaddr: ptr TSockAddr + var localSockaddr, remoteSockaddr: ptr TSockAddr var localLen, remoteLen: int32 getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, - addr LocalSockaddr, addr localLen, - addr RemoteSockaddr, addr remoteLen) + addr localSockaddr, addr localLen, + addr remoteSockaddr, addr remoteLen) register(clientSock.TAsyncFD) # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 retFuture.complete( @@ -625,25 +733,25 @@ when defined(windows) or defined(nimdoc): else: retFuture.complete(newAcceptFut.read) else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) + retFuture.fail(newException(OSError, osErrorMsg(errcode))) var ol = PCustomOverlapped() GC_ref(ol) - ol.data = TCompletionData(sock: socket, cb: - proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) = + ol.data = TCompletionData(fd: socket, cb: + proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) = if not retFuture.finished: - if errcode == TOSErrorCode(-1): + if errcode == OSErrorCode(-1): completeAccept() else: failAccept(errcode) ) # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx - let ret = acceptEx(socket.TSocketHandle, clientSock, addr lpOutputBuf[0], + let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0], dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, - addr dwBytesReceived, cast[POverlapped](ol)) + addr dwBytesReceived, cast[POVERLAPPED](ol)) if not ret: let err = osLastError() @@ -658,17 +766,23 @@ when defined(windows) or defined(nimdoc): return retFuture - proc newAsyncRawSocket*(domain: TDomain = AF_INET, - typ: TType = SOCK_STREAM, - protocol: TProtocol = IPPROTO_TCP): TAsyncFD = + proc newAsyncRawSocket*(domain, typ, protocol: cint): TAsyncFD = + ## Creates a new socket and registers it with the dispatcher implicitly. + result = newRawSocket(domain, typ, protocol).TAsyncFD + result.SocketHandle.setBlocking(false) + register(result) + + proc newAsyncRawSocket*(domain: Domain = AF_INET, + typ: SockType = SOCK_STREAM, + protocol: Protocol = IPPROTO_TCP): TAsyncFD = ## Creates a new socket and registers it with the dispatcher implicitly. result = newRawSocket(domain, typ, protocol).TAsyncFD - result.TSocketHandle.setBlocking(false) + result.SocketHandle.setBlocking(false) register(result) proc closeSocket*(socket: TAsyncFD) = ## Closes a socket and ensures that it is unregistered. - socket.TSocketHandle.close() + socket.SocketHandle.close() getGlobalDispatcher().handles.excl(socket) proc unregister*(fd: TAsyncFD) = @@ -692,10 +806,10 @@ else: type TAsyncFD* = distinct cint - TCallback = proc (sock: TAsyncFD): bool {.closure,gcsafe.} + TCallback = proc (fd: TAsyncFD): bool {.closure,gcsafe.} PData* = ref object of PObject - sock: TAsyncFD + fd: TAsyncFD readCBs: seq[TCallback] writeCBs: seq[TCallback] @@ -714,51 +828,56 @@ else: if gDisp.isNil: gDisp = newDispatcher() result = gDisp - proc update(sock: TAsyncFD, events: set[TEvent]) = + proc update(fd: TAsyncFD, events: set[TEvent]) = let p = getGlobalDispatcher() - assert sock.TSocketHandle in p.selector - discard p.selector.update(sock.TSocketHandle, events) + assert fd.SocketHandle in p.selector + discard p.selector.update(fd.SocketHandle, events) - proc register(sock: TAsyncFD) = + proc register*(fd: TAsyncFD) = let p = getGlobalDispatcher() - var data = PData(sock: sock, readCBs: @[], writeCBs: @[]) - p.selector.register(sock.TSocketHandle, {}, data.PObject) + var data = PData(fd: fd, readCBs: @[], writeCBs: @[]) + p.selector.register(fd.SocketHandle, {}, data.PObject) + + proc newAsyncRawSocket*(domain: cint, typ: cint, protocol: cint): TAsyncFD = + result = newRawSocket(domain, typ, protocol).TAsyncFD + result.SocketHandle.setBlocking(false) + register(result) proc newAsyncRawSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, protocol: TProtocol = IPPROTO_TCP): TAsyncFD = result = newRawSocket(domain, typ, protocol).TAsyncFD - result.TSocketHandle.setBlocking(false) + result.SocketHandle.setBlocking(false) register(result) proc closeSocket*(sock: TAsyncFD) = let disp = getGlobalDispatcher() - sock.TSocketHandle.close() - disp.selector.unregister(sock.TSocketHandle) + sock.SocketHandle.close() + disp.selector.unregister(sock.SocketHandle) proc unregister*(fd: TAsyncFD) = - getGlobalDispatcher().selector.unregister(fd.TSocketHandle) + getGlobalDispatcher().selector.unregister(fd.SocketHandle) - proc addRead(sock: TAsyncFD, cb: TCallback) = + proc addRead*(fd: TAsyncFD, cb: TCallback) = let p = getGlobalDispatcher() - if sock.TSocketHandle notin p.selector: + if fd.SocketHandle notin p.selector: raise newException(EInvalidValue, "File descriptor not registered.") - p.selector[sock.TSocketHandle].data.PData.readCBs.add(cb) - update(sock, p.selector[sock.TSocketHandle].events + {EvRead}) + p.selector[fd.SocketHandle].data.PData.readCBs.add(cb) + update(fd, p.selector[fd.SocketHandle].events + {EvRead}) - proc addWrite(sock: TAsyncFD, cb: TCallback) = + proc addWrite*(fd: TAsyncFD, cb: TCallback) = let p = getGlobalDispatcher() - if sock.TSocketHandle notin p.selector: + if fd.SocketHandle notin p.selector: raise newException(EInvalidValue, "File descriptor not registered.") - p.selector[sock.TSocketHandle].data.PData.writeCBs.add(cb) - update(sock, p.selector[sock.TSocketHandle].events + {EvWrite}) + p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb) + update(fd, p.selector[fd.SocketHandle].events + {EvWrite}) proc poll*(timeout = 500) = let p = getGlobalDispatcher() for info in p.selector.select(timeout): let data = PData(info.key.data) - assert data.sock == info.key.fd.TAsyncFD - #echo("In poll ", data.sock.cint) + assert data.fd == info.key.fd.TAsyncFD + #echo("In poll ", data.fd.cint) if EvRead in info.events: # Callback may add items to ``data.readCBs`` which causes issues if # we are iterating over ``data.readCBs`` at the same time. We therefore @@ -766,7 +885,7 @@ else: let currentCBs = data.readCBs data.readCBs = @[] for cb in currentCBs: - if not cb(data.sock): + if not cb(data.fd): # Callback wants to be called again. data.readCBs.add(cb) @@ -774,7 +893,7 @@ else: let currentCBs = data.writeCBs data.writeCBs = @[] for cb in currentCBs: - if not cb(data.sock): + if not cb(data.fd): # Callback wants to be called again. data.writeCBs.add(cb) @@ -783,18 +902,19 @@ else: if data.readCBs.len != 0: newEvents = {EvRead} if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} if newEvents != info.key.events: - update(data.sock, newEvents) + update(data.fd, newEvents) else: # FD no longer a part of the selector. Likely been closed # (e.g. socket disconnected). + discard processTimers(p) proc connect*(socket: TAsyncFD, address: string, port: TPort, - af = AF_INET): PFuture[void] = + af = AF_INET): Future[void] = var retFuture = newFuture[void]("connect") - proc cb(sock: TAsyncFD): bool = + proc cb(fd: TAsyncFD): bool = # We have connected. retFuture.complete() return true @@ -804,7 +924,7 @@ else: var lastError: TOSErrorCode var it = aiList while it != nil: - var ret = connect(socket.TSocketHandle, it.ai_addr, it.ai_addrlen.TSocklen) + var ret = connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen) if ret == 0: # Request to connect completed immediately. success = true @@ -826,14 +946,14 @@ else: return retFuture proc recv*(socket: TAsyncFD, size: int, - flags = {TSocketFlags.SafeDisconn}): PFuture[string] = + flags = {TSocketFlags.SafeDisconn}): Future[string] = var retFuture = newFuture[string]("recv") var readBuffer = newString(size) proc cb(sock: TAsyncFD): bool = result = true - let res = recv(sock.TSocketHandle, addr readBuffer[0], size.cint, + let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint, flags.toOSFlags()) #echo("recv cb res: ", res) if res < 0: @@ -857,7 +977,7 @@ else: return retFuture proc send*(socket: TAsyncFD, data: string, - flags = {TSocketFlags.SafeDisconn}): PFuture[void] = + flags = {TSocketFlags.SafeDisconn}): Future[void] = var retFuture = newFuture[void]("send") var written = 0 @@ -866,7 +986,7 @@ else: result = true let netSize = data.len-written var d = data.cstring - let res = send(sock.TSocketHandle, addr d[written], netSize.cint, + let res = send(sock.SocketHandle, addr d[written], netSize.cint, MSG_NOSIGNAL) if res < 0: let lastError = osLastError() @@ -889,15 +1009,15 @@ else: return retFuture proc acceptAddr*(socket: TAsyncFD, flags = {TSocketFlags.SafeDisconn}): - PFuture[tuple[address: string, client: TAsyncFD]] = + Future[tuple[address: string, client: TAsyncFD]] = var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]("acceptAddr") proc cb(sock: TAsyncFD): bool = result = true - var sockAddress: Tsockaddr_in - var addrLen = sizeof(sockAddress).TSocklen - var client = accept(sock.TSocketHandle, - cast[ptr TSockAddr](addr(sockAddress)), addr(addrLen)) + var sockAddress: SockAddr_in + var addrLen = sizeof(sockAddress).Socklen + var client = accept(sock.SocketHandle, + cast[ptr SockAddr](addr(sockAddress)), addr(addrLen)) if client == osInvalidSocket: let lastError = osLastError() assert lastError.int32 notin {EWOULDBLOCK, EAGAIN} @@ -914,7 +1034,7 @@ else: addRead(socket, cb) return retFuture -proc sleepAsync*(ms: int): PFuture[void] = +proc sleepAsync*(ms: int): Future[void] = ## Suspends the execution of the current async procedure for the next ## ``ms`` miliseconds. var retFuture = newFuture[void]("sleepAsync") @@ -923,14 +1043,14 @@ proc sleepAsync*(ms: int): PFuture[void] = return retFuture proc accept*(socket: TAsyncFD, - flags = {TSocketFlags.SafeDisconn}): PFuture[TAsyncFD] = + flags = {SocketFlag.SafeDisconn}): Future[TAsyncFD] = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection. ## The future will complete when the connection is successfully accepted. var retFut = newFuture[TAsyncFD]("accept") var fut = acceptAddr(socket, flags) fut.callback = - proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) = + proc (future: Future[tuple[address: string, client: TAsyncFD]]) = assert future.finished if future.failed: retFut.fail(future.error) @@ -940,7 +1060,7 @@ proc accept*(socket: TAsyncFD, # -- Await Macro -template createCb*(retFutureSym, iteratorNameSym, +template createCb(retFutureSym, iteratorNameSym, name: expr): stmt {.immediate.} = var nameIterVar = iteratorNameSym #{.push stackTrace: off.} @@ -963,29 +1083,50 @@ template createCb*(retFutureSym, iteratorNameSym, cb() #{.pop.} proc generateExceptionCheck(futSym, - exceptBranch, rootReceiver, fromNode: PNimrodNode): PNimrodNode {.compileTime.} = - if exceptBranch == nil: + tryStmt, rootReceiver, fromNode: PNimrodNode): PNimrodNode {.compileTime.} = + if tryStmt.kind == nnkNilLit: result = rootReceiver else: - if exceptBranch[0].kind == nnkStmtList: - result = newIfStmt( - (newDotExpr(futSym, newIdentNode("failed")), - exceptBranch[0] - ) - ) - else: - expectKind(exceptBranch[1], nnkStmtList) - result = newIfStmt( - (newDotExpr(futSym, newIdentNode("failed")), - newIfStmt( - (infix(newDotExpr(futSym, newIdentNode("error")), "of", exceptBranch[0]), - exceptBranch[1]) - ) - ) - ) + var exceptionChecks: seq[tuple[cond, body: PNimrodNode]] = @[] + let errorNode = newDotExpr(futSym, newIdentNode("error")) + for i in 1 .. <tryStmt.len: + let exceptBranch = tryStmt[i] + if exceptBranch[0].kind == nnkStmtList: + exceptionChecks.add((newIdentNode("true"), exceptBranch[0])) + else: + var exceptIdentCount = 0 + var ifCond: PNimrodNode + for i in 0 .. <exceptBranch.len: + let child = exceptBranch[i] + if child.kind == nnkIdent: + let cond = infix(errorNode, "of", child) + if exceptIdentCount == 0: + ifCond = cond + else: + ifCond = infix(ifCond, "or", cond) + else: + break + exceptIdentCount.inc + + expectKind(exceptBranch[exceptIdentCount], nnkStmtList) + exceptionChecks.add((ifCond, exceptBranch[exceptIdentCount])) + # -> -> else: raise futSym.error + exceptionChecks.add((newIdentNode("true"), + newNimNode(nnkRaiseStmt).add(errorNode))) + # Read the future if there is no error. + # -> else: futSym.read let elseNode = newNimNode(nnkElse, fromNode) elseNode.add newNimNode(nnkStmtList, fromNode) elseNode[0].add rootReceiver + + let ifBody = newStmtList() + ifBody.add newCall(newIdentNode("setCurrentException"), errorNode) + ifBody.add newIfStmt(exceptionChecks) + ifBody.add newCall(newIdentNode("setCurrentException"), newNilLit()) + + result = newIfStmt( + (newDotExpr(futSym, newIdentNode("failed")), ifBody) + ) result.add elseNode template createVar(result: var PNimrodNode, futSymName: string, @@ -997,25 +1138,25 @@ template createVar(result: var PNimrodNode, futSymName: string, result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y result.add newNimNode(nnkYieldStmt, fromNode).add(futSym) # -> yield future<x> valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read - result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver, fromNode) + result.add generateExceptionCheck(futSym, tryStmt, rootReceiver, fromNode) proc processBody(node, retFutureSym: PNimrodNode, subTypeIsVoid: bool, - exceptBranch: PNimrodNode): PNimrodNode {.compileTime.} = + tryStmt: PNimrodNode): PNimrodNode {.compileTime.} = #echo(node.treeRepr) result = node case node.kind of nnkReturnStmt: result = newNimNode(nnkStmtList, node) if node[0].kind == nnkEmpty: - if not subtypeIsVoid: + if not subTypeIsVoid: result.add newCall(newIdentNode("complete"), retFutureSym, newIdentNode("result")) else: result.add newCall(newIdentNode("complete"), retFutureSym) else: result.add newCall(newIdentNode("complete"), retFutureSym, - node[0].processBody(retFutureSym, subtypeIsVoid, exceptBranch)) + node[0].processBody(retFutureSym, subTypeIsVoid, tryStmt)) result.add newNimNode(nnkReturnStmt, node).add(newNilLit()) return # Don't process the children of this return stmt @@ -1070,7 +1211,7 @@ proc processBody(node, retFutureSym: PNimrodNode, res: PNimrodNode): bool {.compileTime.} = result = false while i < n[0].len: - var processed = processBody(n[0][i], retFutureSym, subtypeIsVoid, n[1]) + var processed = processBody(n[0][i], retFutureSym, subTypeIsVoid, n) if processed.kind != n[0][i].kind or processed.len != n[0][i].len: expectKind(processed, nnkStmtList) expectKind(processed[2][1], nnkElse) @@ -1090,7 +1231,7 @@ proc processBody(node, retFutureSym: PNimrodNode, else: discard for i in 0 .. <result.len: - result[i] = processBody(result[i], retFutureSym, subtypeIsVoid, exceptBranch) + result[i] = processBody(result[i], retFutureSym, subTypeIsVoid, tryStmt) proc getName(node: PNimrodNode): string {.compileTime.} = case node.kind @@ -1113,12 +1254,12 @@ macro async*(prc: stmt): stmt {.immediate.} = hint("Processing " & prc[0].getName & " as an async proc.") let returnType = prc[3][0] - # Verify that the return type is a PFuture[T] + # Verify that the return type is a Future[T] if returnType.kind == nnkIdent: - error("Expected return type of 'PFuture' got '" & $returnType & "'") + error("Expected return type of 'Future' got '" & $returnType & "'") elif returnType.kind == nnkBracketExpr: - if $returnType[0] != "PFuture": - error("Expected return type of 'PFuture' got '" & $returnType[0] & "'") + if $returnType[0] != "Future": + error("Expected return type of 'Future' got '" & $returnType[0] & "'") let subtypeIsVoid = returnType.kind == nnkEmpty or (returnType.kind == nnkBracketExpr and @@ -1139,7 +1280,7 @@ macro async*(prc: stmt): stmt {.immediate.} = subRetType), newLit(prc[0].getName)))) # Get type from return type of this proc - # -> iterator nameIter(): PFutureBase {.closure.} = + # -> iterator nameIter(): FutureBase {.closure.} = # -> var result: T # -> <proc_body> # -> complete(retFuture, result) @@ -1155,14 +1296,14 @@ macro async*(prc: stmt): stmt {.immediate.} = # -> complete(retFuture) procBody.add(newCall(newIdentNode("complete"), retFutureSym)) - var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")], + var closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase")], procBody, nnkIteratorDef) closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure")) outerProcBody.add(closureIterator) # -> createCb(retFuture) var cbName = newIdentNode("cb") - var procCb = newCall("createCb", retFutureSym, iteratorNameSym, + var procCb = newCall(bindSym"createCb", retFutureSym, iteratorNameSym, newStrLitNode(prc[0].getName)) outerProcBody.add procCb @@ -1178,16 +1319,16 @@ macro async*(prc: stmt): stmt {.immediate.} = if subtypeIsVoid: # Add discardable pragma. if returnType.kind == nnkEmpty: - # Add PFuture[void] - result[3][0] = parseExpr("PFuture[void]") + # Add Future[void] + result[3][0] = parseExpr("Future[void]") result[6] = outerProcBody #echo(treeRepr(result)) - #if prc[0].getName == "getFile": + #if prc[0].getName == "catch": # echo(toStrLit(result)) -proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} = +proc recvLine*(socket: TAsyncFD): Future[string] {.async.} = ## Reads a line of data from ``socket``. Returned future will complete once ## a full line is read or an error occurs. ## @@ -1200,6 +1341,11 @@ proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} = ## If the socket is disconnected in the middle of a line (before ``\r\L`` ## is read) then line will be set to ``""``. ## The partial line **will be lost**. + ## + ## **Warning**: This assumes that lines are delimited by ``\r\L``. + ## + ## **Note**: This procedure is mostly used for testing. You likely want to + ## use ``asyncnet.recvLine`` instead. template addNLIfEmpty(): stmt = if result.len == 0: @@ -1212,9 +1358,8 @@ proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} = if c.len == 0: return "" if c == "\r": - c = await recv(socket, 1, {TSocketFlags.SafeDisconn, TSocketFlags.Peek}) - if c.len > 0 and c == "\L": - discard await recv(socket, 1) + c = await recv(socket, 1) + assert c == "\l" addNLIfEmpty() return elif c == "\L": @@ -1227,10 +1372,9 @@ proc runForever*() = while true: poll() -proc waitFor*[T](fut: PFuture[T]) = +proc waitFor*[T](fut: PFuture[T]): T = ## **Blocks** the current thread until the specified future completes. while not fut.finished: poll() - if fut.failed: - raise fut.error + fut.read |