summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim106
1 files changed, 80 insertions, 26 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 6292bfc12..d410f8ce1 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -11,8 +11,9 @@ include "system/inclrtl"
 
 import os, oids, tables, strutils, macros
 
-import rawsockets
-export TPort
+import rawsockets, net
+
+export TPort, TSocketFlags
 
 #{.injectStmt: newGcInvariant().}
 
@@ -40,6 +41,7 @@ type
     cb: proc () {.closure,gcsafe.}
     finished: bool
     error*: ref EBase
+    stackTrace: string ## For debugging purposes only.
 
   PFuture*[T] = ref object of PFutureBase
     value: T
@@ -48,10 +50,23 @@ proc newFuture*[T](): PFuture[T] =
   ## Creates a new future.
   new(result)
   result.finished = false
+  result.stackTrace = getStackTrace()
+
+proc checkFinished[T](future: PFuture[T]) =
+  if future.finished:
+    echo("<----->")
+    echo(future.stackTrace)
+    echo("-----")
+    when T is string:
+      echo("Contents: ", future.value.repr)
+    echo("<----->")
+    echo("Future already finished, cannot finish twice.")
+    assert false
 
 proc complete*[T](future: PFuture[T], val: T) =
   ## Completes ``future`` with value ``val``.
-  assert(not future.finished, "Future already finished, cannot finish twice.")
+  #assert(not future.finished, "Future already finished, cannot finish twice.")
+  checkFinished(future)
   assert(future.error == nil)
   future.value = val
   future.finished = true
@@ -60,7 +75,8 @@ proc complete*[T](future: PFuture[T], val: T) =
 
 proc complete*(future: PFuture[void]) =
   ## Completes a void ``future``.
-  assert(not future.finished, "Future already finished, cannot finish twice.")
+  #assert(not future.finished, "Future already finished, cannot finish twice.")
+  checkFinished(future)
   assert(future.error == nil)
   future.finished = true
   if future.cb != nil:
@@ -68,7 +84,8 @@ proc complete*(future: PFuture[void]) =
 
 proc fail*[T](future: PFuture[T], error: ref EBase) =
   ## Completes ``future`` with ``error``.
-  assert(not future.finished, "Future already finished, cannot finish twice.")
+  #assert(not future.finished, "Future already finished, cannot finish twice.")
+  checkFinished(future)
   future.finished = true
   future.error = error
   if future.cb != nil:
@@ -126,6 +143,15 @@ proc failed*(future: PFutureBase): bool =
   ## Determines whether ``future`` completed with an error.
   future.error != nil
 
+proc asyncCheck*[T](future: PFuture[T]) =
+  ## Sets a callback on ``future`` which raises an exception if the future
+  ## finished with an error.
+  ##
+  ## This should be used instead of ``discard`` to discard void futures.
+  future.callback =
+    proc () =
+      if future.failed: raise future.error
+
 when defined(windows) or defined(nimdoc):
   import winlean, sets, hashes
   type
@@ -344,7 +370,7 @@ when defined(windows) or defined(nimdoc):
     return retFuture
 
   proc recv*(socket: TAsyncFD, size: int,
-             flags: int = 0): PFuture[string] =
+             flags = {TSocketFlags.SafeDisconn}): PFuture[string] =
     ## Reads **up to** ``size`` bytes from ``socket``. Returned future will
     ## complete once all the data requested is read, a part of the data has been
     ## read, or the socket has disconnected in which case the future will
@@ -364,7 +390,7 @@ when defined(windows) or defined(nimdoc):
     dataBuf.len = size
     
     var bytesReceived: DWord
-    var flagsio = flags.DWord
+    var flagsio = flags.toOSFlags().DWord
     var ol = PCustomOverlapped()
     GC_ref(ol)
     ol.data = TCompletionData(sock: socket, cb:
@@ -394,7 +420,10 @@ when defined(windows) or defined(nimdoc):
           dealloc dataBuf.buf
           dataBuf.buf = nil
         GC_unref(ol)
-        retFuture.fail(newException(EOS, osErrorMsg(err)))
+        if flags.isDisconnectionError(err):
+          retFuture.complete("")
+        else:
+          retFuture.fail(newException(EOS, osErrorMsg(err)))
     elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
       # We have to ensure that the buffer is empty because WSARecv will tell
       # us immediatelly when it was disconnected, even when there is still
@@ -425,7 +454,8 @@ when defined(windows) or defined(nimdoc):
       # free ``ol``.
     return retFuture
 
-  proc send*(socket: TAsyncFD, data: string): PFuture[void] =
+  proc send*(socket: TAsyncFD, data: string,
+             flags = {TSocketFlags.SafeDisconn}): PFuture[void] =
     ## Sends ``data`` to ``socket``. The returned future will complete once all
     ## data has been sent.
     verifyPresence(socket)
@@ -435,7 +465,7 @@ when defined(windows) or defined(nimdoc):
     dataBuf.buf = data # since this is not used in a callback, this is fine
     dataBuf.len = data.len
 
-    var bytesReceived, flags: DWord
+    var bytesReceived, lowFlags: DWord
     var ol = PCustomOverlapped()
     GC_ref(ol)
     ol.data = TCompletionData(sock: socket, cb:
@@ -448,12 +478,15 @@ when defined(windows) or defined(nimdoc):
     )
 
     let ret = WSASend(socket.TSocketHandle, addr dataBuf, 1, addr bytesReceived,
-                      flags, cast[POverlapped](ol), nil)
+                      lowFlags, cast[POverlapped](ol), nil)
     if ret == -1:
       let err = osLastError()
       if err.int32 != ERROR_IO_PENDING:
-        retFuture.fail(newException(EOS, osErrorMsg(err)))
         GC_unref(ol)
+        if flags.isDisconnectionError(err):
+          retFuture.complete()
+        else:
+          retFuture.fail(newException(EOS, osErrorMsg(err)))
     else:
       retFuture.complete()
       # We don't deallocate ``ol`` here because even though this completed
@@ -552,7 +585,18 @@ when defined(windows) or defined(nimdoc):
   initAll()
 else:
   import selectors
-  from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK
+  when defined(windows):
+    import winlean
+    const
+      EINTR = WSAEINPROGRESS
+      EINPROGRESS = WSAEINPROGRESS
+      EWOULDBLOCK = WSAEWOULDBLOCK
+      EAGAIN = EINPROGRESS
+      MSG_NOSIGNAL = 0
+  else:
+    from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
+                      MSG_NOSIGNAL
+  
   type
     TAsyncFD* = distinct cint
     TCallback = proc (sock: TAsyncFD): bool {.closure,gcsafe.}
@@ -686,20 +730,23 @@ else:
     return retFuture
 
   proc recv*(socket: TAsyncFD, size: int,
-             flags: int = 0): PFuture[string] =
+             flags = {TSocketFlags.SafeDisconn}): PFuture[string] =
     var retFuture = newFuture[string]()
     
     var readBuffer = newString(size)
 
     proc cb(sock: TAsyncFD): bool =
       result = true
-      let res = recv(sock.TSocketHandle, addr readBuffer[0], size,
-                     flags.cint)
+      let res = recv(sock.TSocketHandle, addr readBuffer[0], size.cint,
+                     flags.toOSFlags())
       #echo("recv cb res: ", res)
       if res < 0:
         let lastError = osLastError()
-        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: 
-          retFuture.fail(newException(EOS, osErrorMsg(lastError)))
+        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+          if flags.isDisconnectionError(lastError):
+            retFuture.complete("")
+          else:
+            retFuture.fail(newException(EOS, osErrorMsg(lastError)))
         else:
           result = false # We still want this callback to be called.
       elif res == 0:
@@ -708,11 +755,13 @@ else:
       else:
         readBuffer.setLen(res)
         retFuture.complete(readBuffer)
-  
+    # TODO: The following causes a massive slowdown.
+    #if not cb(socket):
     addRead(socket, cb)
     return retFuture
 
-  proc send*(socket: TAsyncFD, data: string): PFuture[void] =
+  proc send*(socket: TAsyncFD, data: string,
+             flags = {TSocketFlags.SafeDisconn}): PFuture[void] =
     var retFuture = newFuture[void]()
     
     var written = 0
@@ -721,11 +770,15 @@ else:
       result = true
       let netSize = data.len-written
       var d = data.cstring
-      let res = send(sock.TSocketHandle, addr d[written], netSize, 0.cint)
+      let res = send(sock.TSocketHandle, addr d[written], netSize.cint,
+                     MSG_NOSIGNAL)
       if res < 0:
         let lastError = osLastError()
         if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
-          retFuture.fail(newException(EOS, osErrorMsg(lastError)))
+          if flags.isDisconnectionError(lastError):
+            retFuture.complete()
+          else:
+            retFuture.fail(newException(EOS, osErrorMsg(lastError)))
         else:
           result = false # We still want this callback to be called.
       else:
@@ -734,6 +787,8 @@ else:
           result = false # We still have data to send.
         else:
           retFuture.complete()
+    # TODO: The following causes crashes.
+    #if not cb(socket):
     addWrite(socket, cb)
     return retFuture
 
@@ -779,6 +834,7 @@ proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] =
 template createCb*(retFutureSym, iteratorNameSym,
                    name: expr): stmt {.immediate.} =
   var nameIterVar = iteratorNameSym
+  #{.push stackTrace: off.}
   proc cb {.closure,gcsafe.} =
     try:
       if not nameIterVar.finished:
@@ -791,7 +847,7 @@ template createCb*(retFutureSym, iteratorNameSym,
     except:
       retFutureSym.fail(getCurrentException())
   cb()
-
+  #{.pop.}
 proc generateExceptionCheck(futSym,
     exceptBranch, rootReceiver: PNimrodNode): PNimrodNode {.compileTime.} =
   if exceptBranch == nil:
@@ -1005,8 +1061,6 @@ macro async*(prc: stmt): stmt {.immediate.} =
       result[4].del(i)
   if subtypeIsVoid:
     # Add discardable pragma.
-    if prc.kind == nnkProcDef: # TODO: This is a workaround for #1287
-      result[4].add(newIdentNode("discardable"))
     if returnType.kind == nnkEmpty:
       # Add PFuture[void]
       result[3][0] = parseExpr("PFuture[void]")
@@ -1042,7 +1096,7 @@ proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} =
     if c.len == 0:
       return ""
     if c == "\r":
-      c = await recv(socket, 1, MSG_PEEK)
+      c = await recv(socket, 1, {TSocketFlags.SafeDisconn, TSocketFlags.Peek})
       if c.len > 0 and c == "\L":
         discard await recv(socket, 1)
       addNLIfEmpty()