summary refs log tree commit diff stats
path: root/lib/pure
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@googlemail.com>2014-03-11 21:53:35 +0000
committerDominik Picheta <dominikpicheta@googlemail.com>2014-03-11 21:53:35 +0000
commit2ce07042fdbe7104694245b56006204ade9df34a (patch)
tree3e4085cdf114885f25608aea604975f88e9a9f6d /lib/pure
parent9b5357da5aea54bf9036b98d48ef52da16f7e1a7 (diff)
downloadNim-2ce07042fdbe7104694245b56006204ade9df34a.tar.gz
tasyncawait now works on Linux.
Reworked detection of a file descriptor being closed with epoll (in the
case of sockets it is when the remote host disconnects). Ensured that
events are only updated when they change.
Diffstat (limited to 'lib/pure')
-rw-r--r--lib/pure/asyncio2.nim40
-rw-r--r--lib/pure/selectors.nim65
2 files changed, 68 insertions, 37 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim
index 60d489dda..cdb2b3a89 100644
--- a/lib/pure/asyncio2.nim
+++ b/lib/pure/asyncio2.nim
@@ -479,9 +479,11 @@ else:
       discard p.selector.update(sock, events)
   
   proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) =
+    #echo("addRead")
     if sock notin p.selector:
       var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[])
       p.selector.register(sock, {EvRead}, data.PObject)
+      #echo("registered")
     else:
       p.selector[sock].data.PData.readCBs.add(cb)
       p.update(sock, p.selector[sock].events + {EvRead})
@@ -498,7 +500,7 @@ else:
     for info in p.selector.select(timeout):
       let data = PData(info.key.data)
       assert data.sock == info.key.fd
-      
+      #echo("In poll ", data.sock.cint)
       if EvRead in info.events:
         # Callback may add items to ``data.readCBs`` which causes issues if
         # we are iterating over ``data.readCBs`` at the same time. We therefore
@@ -517,11 +519,17 @@ else:
           if not cb(data.sock):
             # Callback wants to be called again.
             data.writeCBs.add(cb)
-  
-      var newEvents: set[TEvent]
-      if data.readCBs.len != 0: newEvents = {EvRead}
-      if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
-      p.update(data.sock, newEvents)
+      
+      if info.key in p.selector:
+        var newEvents: set[TEvent]
+        if data.readCBs.len != 0: newEvents = {EvRead}
+        if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
+        if newEvents != info.key.events:
+          echo(info.key.events, " -> ", newEvents)
+          p.update(data.sock, newEvents)
+      else:
+        # FD no longer a part of the selector. Likely been closed
+        # (e.g. socket disconnected).
   
   proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort,
     af = AF_INET): PFuture[int] =
@@ -569,6 +577,7 @@ else:
       result = true
       let netSize = size - sizeRead
       let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint)
+      #echo("recv cb res: ", res)
       if res < 0:
         let lastError = osLastError()
         if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: 
@@ -576,6 +585,7 @@ else:
         else:
           result = false # We still want this callback to be called.
       elif res == 0:
+        #echo("Disconnected recv: ", sizeRead)
         # Disconnected
         if sizeRead == 0:
           retFuture.complete("")
@@ -588,6 +598,7 @@ else:
           result = false # We want to read all the data requested.
         else:
           retFuture.complete(readBuffer)
+      #echo("Recv cb result: ", result)
   
     addRead(p, socket, cb)
     return retFuture
@@ -833,9 +844,13 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.}
   result = ""
   var c = ""
   while true:
+    #echo("1")
     c = await p.recv(socket, 1)
+    #echo("Received ", c.len)
     if c.len == 0:
+      #echo("returning")
       return
+    #echo("2")
     if c == "\r":
       c = await p.recv(socket, 1, MSG_PEEK)
       if c.len > 0 and c == "\L":
@@ -845,7 +860,9 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.}
     elif c == "\L":
       addNLIfEmpty()
       return
+    #echo("3")
     add(result.string, c)
+  #echo("4")
 
 when isMainModule:
   
@@ -859,6 +876,7 @@ when isMainModule:
     proc main(p: PDispatcher): PFuture[int] {.async.} =
       discard await p.connect(sock, "irc.freenode.net", TPort(6667))
       while true:
+        echo("recvLine")
         var line = await p.recvLine(sock)
         echo("Line is: ", line.repr)
         if line == "":
@@ -882,7 +900,7 @@ when isMainModule:
   else:
     when true:
 
-      var f = p.connect(sock, "irc.freenode.org", TPort(6667))
+      var f = p.connect(sock, "irc.poop.nl", TPort(6667))
       f.callback =
         proc (future: PFuture[int]) =
           echo("Connected in future!")
@@ -898,11 +916,13 @@ when isMainModule:
       sock.bindAddr(TPort(6667))
       sock.listen()
       proc onAccept(future: PFuture[TSocketHandle]) =
-        echo "Accepted"
-        var t = p.send(future.read, "test\c\L")
+        let client = future.read
+        echo "Accepted ", client.cint
+        var t = p.send(client, "test\c\L")
         t.callback =
           proc (future: PFuture[int]) =
-            echo(future.read)
+            echo("Send: ", future.read)
+            client.close()
         
         var f = p.accept(sock)
         f.callback = onAccept
diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim
index e086ee3ab..a113e3362 100644
--- a/lib/pure/selectors.nim
+++ b/lib/pure/selectors.nim
@@ -10,7 +10,6 @@
 # TODO: Docs.
 
 import tables, os, unsigned, hashes
-import sockets2
 
 when defined(linux): import posix, epoll
 elif defined(windows): import winlean
@@ -41,17 +40,14 @@ when defined(linux) or defined(nimdoc):
       result.events = EPOLLIN
     if EvWrite in events:
       result.events = result.events or EPOLLOUT
+    result.events = result.events or EPOLLRDHUP
     result.data.fd = fd.cint
   
   proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent],
       data: PObject): PSelectorKey {.discardable.} =
     ## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent
     ## ``events``.
-    if s.fds.hasKey(fd):
-      raise newException(EInvalidValue, "File descriptor already exists.")
-    
     var event = createEventStruct(events, fd)
-  
     if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0:
       OSError(OSLastError())
   
@@ -63,30 +59,18 @@ when defined(linux) or defined(nimdoc):
   proc update*(s: PSelector, fd: TSocketHandle,
       events: set[TEvent]): PSelectorKey {.discardable.} =
     ## Updates the events which ``fd`` wants notifications for.
-    if not s.fds.hasKey(fd):
-      raise newException(EInvalidValue, "File descriptor not found.")
-    var event = createEventStruct(events, fd)
-    
-    s.fds[fd].events = events
-    if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
-      if OSLastError().cint == ENOENT:
-        # Socket has been closed. Epoll automatically removes disconnected
-        # sockets.
-        s.fds.del(fd)
-        osError("Socket has been disconnected")
-        
-      OSError(OSLastError())
-    result = s.fds[fd]
+    if s.fds[fd].events != events:
+      echo("Update ", fd.cint, " to ", events)
+      var event = createEventStruct(events, fd)
+      
+      s.fds[fd].events = events
+      if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
+        OSError(OSLastError())
+      result = s.fds[fd]
   
   proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} =
-    if not s.fds.hasKey(fd):
-      raise newException(EInvalidValue, "File descriptor not found.")
     if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
-      if osLastError().cint == ENOENT:
-        # Socket has been closed. Epoll automatically removes disconnected
-        # sockets so its already been removed.
-      else:
-        OSError(OSLastError())
+      OSError(OSLastError())
     result = s.fds[fd]
     s.fds.del(fd)
 
@@ -113,6 +97,14 @@ when defined(linux) or defined(nimdoc):
       assert selectorKey != nil
       result.add((selectorKey, evSet))
   
+      if (s.events[i].events and EPOLLHUP) != 0 or
+         (s.events[i].events and EPOLLRDHUP) != 0:
+        # fd closed
+        #echo("fd closed ", s.events[i].data.fd)
+        s.unregister(s.events[i].data.fd.TSocketHandle)
+  
+      #echo("Epoll: ", result[i].key.fd, " ", result[i].events, " ", result[i].key.events)
+  
   proc newSelector*(): PSelector =
     new result
     result.epollFD = epoll_create(64)
@@ -123,7 +115,26 @@ when defined(linux) or defined(nimdoc):
 
   proc contains*(s: PSelector, fd: TSocketHandle): bool =
     ## Determines whether selector contains a file descriptor.
-    return s.fds.hasKey(fd)
+    if s.fds.hasKey(fd):
+      result = true
+      
+      # Ensure the underlying epoll instance still contains this fd.
+      var event = createEventStruct(s.fds[fd].events, fd)
+      if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
+        let err = osLastError()
+        if err.cint in {ENOENT, EBADF}:
+          return false
+        OSError(OSLastError())
+    else:
+      return false
+
+  proc contains*(s: PSelector, key: PSelectorKey): bool =
+    ## Determines whether selector contains this selector key. More accurate
+    ## than checking if the file descriptor is in the selector because it
+    ## ensures that the keys are equal. File descriptors may not always be
+    ## unique especially when an fd is closed and then a new one is opened,
+    ## the new one may have the same value.
+    return key.fd in s and s.fds[key.fd] == key
 
   proc `[]`*(s: PSelector, fd: TSocketHandle): PSelectorKey =
     ## Retrieves the selector key for ``fd``.