summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--lib/posix/epoll.nim2
-rw-r--r--lib/pure/asyncio2.nim28
-rw-r--r--lib/pure/selectors.nim26
-rw-r--r--lib/pure/sockets2.nim22
-rw-r--r--tests/async/tasyncawait.nim10
5 files changed, 64 insertions, 24 deletions
diff --git a/lib/posix/epoll.nim b/lib/posix/epoll.nim
index 366521551..57a2f001f 100644
--- a/lib/posix/epoll.nim
+++ b/lib/posix/epoll.nim
@@ -36,7 +36,7 @@ type
   epoll_data* {.importc: "union epoll_data", 
       header: "<sys/epoll.h>", pure, final.} = object # TODO: This is actually a union.
     #thePtr* {.importc: "ptr".}: pointer
-    fd*: cint # \
+    fd* {.importc: "fd".}: cint # \
     #u32*: uint32
     #u64*: uint64
 
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim
index 12d4cb5a3..60d489dda 100644
--- a/lib/pure/asyncio2.nim
+++ b/lib/pure/asyncio2.nim
@@ -473,7 +473,6 @@ else:
 
   proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) =
     assert sock in p.selector
-    echo("Update: ", events)
     if events == {}:
       discard p.selector.unregister(sock)
     else:
@@ -499,23 +498,25 @@ else:
     for info in p.selector.select(timeout):
       let data = PData(info.key.data)
       assert data.sock == info.key.fd
-      echo("R: ", data.readCBs.len, " W: ", data.writeCBs.len, ". ", info.events)
       
       if EvRead in info.events:
-        var newReadCBs: seq[TCallback] = @[]
-        for cb in data.readCBs:
+        # Callback may add items to ``data.readCBs`` which causes issues if
+        # we are iterating over ``data.readCBs`` at the same time. We therefore
+        # make a copy to iterate over.
+        let currentCBs = data.readCBs
+        data.readCBs = @[]
+        for cb in currentCBs:
           if not cb(data.sock):
             # Callback wants to be called again.
-            newReadCBs.add(cb)
-        data.readCBs = newReadCBs
+            data.readCBs.add(cb)
       
       if EvWrite in info.events:
-        var newWriteCBs: seq[TCallback] = @[]
-        for cb in data.writeCBs:
+        let currentCBs = data.writeCBs
+        data.writeCBs = @[]
+        for cb in currentCBs:
           if not cb(data.sock):
             # Callback wants to be called again.
-            newWriteCBs.add(cb)
-        data.writeCBs = newWriteCBs
+            data.writeCBs.add(cb)
   
       var newEvents: set[TEvent]
       if data.readCBs.len != 0: newEvents = {EvRead}
@@ -615,7 +616,6 @@ else:
           retFuture.complete(0)
     addWrite(p, socket, cb)
     return retFuture
-        
 
   proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): 
       PFuture[tuple[address: string, client: TSocketHandle]] =
@@ -854,7 +854,7 @@ when isMainModule:
   sock.setBlocking false
 
 
-  when false:
+  when true:
     # Await tests
     proc main(p: PDispatcher): PFuture[int] {.async.} =
       discard await p.connect(sock, "irc.freenode.net", TPort(6667))
@@ -880,7 +880,7 @@ when isMainModule:
     
 
   else:
-    when false:
+    when true:
 
       var f = p.connect(sock, "irc.freenode.org", TPort(6667))
       f.callback =
@@ -919,4 +919,4 @@ when isMainModule:
 
   
 
-  
\ No newline at end of file
+  
diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim
index 6482a01a6..e086ee3ab 100644
--- a/lib/pure/selectors.nim
+++ b/lib/pure/selectors.nim
@@ -10,11 +10,13 @@
 # TODO: Docs.
 
 import tables, os, unsigned, hashes
+import sockets2
 
 when defined(linux): import posix, epoll
 elif defined(windows): import winlean
 
 proc hash*(x: TSocketHandle): THash {.borrow.}
+proc `$`*(x: TSocketHandle): string {.borrow.}
 
 type
   TEvent* = enum
@@ -31,7 +33,7 @@ when defined(linux) or defined(nimdoc):
   type
     PSelector* = ref object
       epollFD: cint
-      events: array[64, ptr epoll_event]
+      events: array[64, epoll_event]
       fds: TTable[TSocketHandle, PSelectorKey]
   
   proc createEventStruct(events: set[TEvent], fd: TSocketHandle): epoll_event =
@@ -66,17 +68,25 @@ when defined(linux) or defined(nimdoc):
     var event = createEventStruct(events, fd)
     
     s.fds[fd].events = events
-    echo("About to update")
     if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
+      if OSLastError().cint == ENOENT:
+        # Socket has been closed. Epoll automatically removes disconnected
+        # sockets.
+        s.fds.del(fd)
+        osError("Socket has been disconnected")
+        
       OSError(OSLastError())
-    echo("finished updating")
     result = s.fds[fd]
   
   proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} =
     if not s.fds.hasKey(fd):
       raise newException(EInvalidValue, "File descriptor not found.")
     if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
-      OSError(OSLastError())
+      if osLastError().cint == ENOENT:
+        # Socket has been closed. Epoll automatically removes disconnected
+        # sockets so its already been removed.
+      else:
+        OSError(OSLastError())
     result = s.fds[fd]
     s.fds.del(fd)
 
@@ -92,21 +102,21 @@ when defined(linux) or defined(nimdoc):
     ## on the ``fd``.
     result = @[]
     
-    let evNum = epoll_wait(s.epollFD, s.events[0], 64.cint, timeout.cint)
+    let evNum = epoll_wait(s.epollFD, addr s.events[0], 64.cint, timeout.cint)
     if evNum < 0: OSError(OSLastError())
     if evNum == 0: return @[]
     for i in 0 .. <evNum:
       var evSet: set[TEvent] = {}
       if (s.events[i].events and EPOLLIN) != 0: evSet = evSet + {EvRead}
       if (s.events[i].events and EPOLLOUT) != 0: evSet = evSet + {EvWrite}
-      
       let selectorKey = s.fds[s.events[i].data.fd.TSocketHandle]
+      assert selectorKey != nil
       result.add((selectorKey, evSet))
   
   proc newSelector*(): PSelector =
     new result
     result.epollFD = epoll_create(64)
-    result.events = cast[array[64, ptr epoll_event]](alloc0(sizeof(epoll_event)*64))
+    result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64))
     result.fds = initTable[TSocketHandle, PSelectorKey]()
     if result.epollFD < 0:
       OSError(OSLastError())
@@ -247,4 +257,4 @@ when isMainModule:
   
   
   
-  
\ No newline at end of file
+  
diff --git a/lib/pure/sockets2.nim b/lib/pure/sockets2.nim
index 3542a0694..290f414b4 100644
--- a/lib/pure/sockets2.nim
+++ b/lib/pure/sockets2.nim
@@ -24,6 +24,10 @@ else:
 export TSocketHandle, TSockaddr_in, TAddrinfo, INADDR_ANY, TSockAddr, TSockLen,
   inet_ntoa, recv, `==`, connect, send, accept
 
+export
+  SO_ERROR,
+  SOL_SOCKET
+
 type
   
   TPort* = distinct uint16  ## port type
@@ -208,6 +212,24 @@ proc htons*(x: int16): int16 =
   ## order, this is a no-op; otherwise, it performs a 2-byte swap operation.
   result = sockets2.ntohs(x)
 
+proc getSockOptInt*(socket: TSocketHandle, level, optname: int): int {.
+  tags: [FReadIO].} = 
+  ## getsockopt for integer options.
+  var res: cint
+  var size = sizeof(res).TSocklen
+  if getsockopt(socket, cint(level), cint(optname), 
+                addr(res), addr(size)) < 0'i32:
+    osError(osLastError())
+  result = int(res)
+
+proc setSockOptInt*(socket: TSocketHandle, level, optname, optval: int) {.
+  tags: [FWriteIO].} =
+  ## setsockopt for integer options.
+  var value = cint(optval)
+  if setsockopt(socket, cint(level), cint(optname), addr(value),  
+                sizeof(value).TSocklen) < 0'i32:
+    osError(osLastError())
+
 when defined(Windows):
   var wsa: TWSADATA
   if WSAStartup(0x0101'i16, addr wsa) != 0: OSError(OSLastError())
diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim
index bde5bf8c8..9e5d270c3 100644
--- a/tests/async/tasyncawait.nim
+++ b/tests/async/tasyncawait.nim
@@ -15,16 +15,24 @@ const
 var clientCount = 0
 
 proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.async.} =
+  echo("entering sendMessages")
   for i in 0 .. <messagesToSend:
-    discard await disp.send(client, "Message " & $i & "\c\L") 
+    discard await disp.send(client, "Message " & $i & "\c\L")
+  echo("returning sendMessages")
 
 proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} =
   for i in 0 .. <swarmSize:
     var sock = socket()
+    # TODO: We may need to explicitly register and unregister the fd.
+    # This is because when the socket is closed, selectors is not aware
+    # that it has been closed. While epoll is. Perhaps we should just unregister
+    # in close()?
+    echo(sock.cint)
     #disp.register(sock)
     discard await disp.connect(sock, "localhost", port)
     when true:
       discard await sendMessages(disp, sock)
+      echo("Calling close")
       sock.close()
     else:
       # Issue #932: https://github.com/Araq/Nimrod/issues/932