summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--lib/pure/asyncio.nim176
-rwxr-xr-xlib/pure/sockets.nim11
2 files changed, 147 insertions, 40 deletions
diff --git a/lib/pure/asyncio.nim b/lib/pure/asyncio.nim
index 113b1d080..c52cd3b94 100644
--- a/lib/pure/asyncio.nim
+++ b/lib/pure/asyncio.nim
@@ -70,22 +70,24 @@ import sockets, os
 ## the socket has established a connection to a server socket; from that point
 ## it can be safely written to.
 
-
+when defined(windows):
+  from winlean import TTimeVal, TFdSet, FD_ZERO, FD_SET, FD_ISSET, select
+else:
+  from posix import TTimeVal, TFdSet, FD_ZERO, FD_SET, FD_ISSET, select
 
 type
-
   TDelegate = object
+    fd: cint
     deleVal*: PObject
 
     handleRead*: proc (h: PObject) {.nimcall.}
     handleWrite*: proc (h: PObject) {.nimcall.}
-    handleConnect*: proc (h: PObject) {.nimcall.}
-
-    handleAccept*: proc (h: PObject) {.nimcall.}
-    getSocket*: proc (h: PObject): tuple[info: TInfo, sock: TSocket] {.nimcall.}
-
+    handleError*: proc (h: PObject) {.nimcall.}
+    hasDataBuffered*: proc (h: PObject): bool {.nimcall.}
+    
+    open*: bool
     task*: proc (h: PObject) {.nimcall.}
-    mode*: TMode
+    mode*: TFileMode
     
   PDelegate* = ref TDelegate
 
@@ -106,24 +108,20 @@ type
     lineBuffer: TaintedString ## Temporary storage for ``recvLine``
     sslNeedAccept: bool
     proto: TProtocol
+    deleg: PDelegate
 
-  TInfo* = enum
+  TInfo = enum
     SockIdle, SockConnecting, SockConnected, SockListening, SockClosed, SockUDPBound
-  
-  TMode* = enum
-    MReadable, MWriteable, MReadWrite
 
 proc newDelegate*(): PDelegate =
   ## Creates a new delegate.
   new(result)
   result.handleRead = (proc (h: PObject) = nil)
   result.handleWrite = (proc (h: PObject) = nil)
-  result.handleConnect = (proc (h: PObject) = nil)
-  result.handleAccept = (proc (h: PObject) = nil)
-  result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
-                        doAssert(false))
+  result.handleError = (proc (h: PObject) = nil)
+  result.hasDataBuffered = (proc (h: PObject): bool = return false)
   result.task = (proc (h: PObject) = nil)
-  result.mode = MReadable
+  result.mode = fmRead
 
 proc newAsyncSocket(): PAsyncSocket =
   new(result)
@@ -144,21 +142,28 @@ proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
   if result.socket == InvalidSocket: OSError()
   result.socket.setBlocking(false)
 
-proc asyncSockHandleConnect(h: PObject) =
+proc asyncSockHandleRead(h: PObject) =
   when defined(ssl):
     if PAsyncSocket(h).socket.isSSL and not
          PAsyncSocket(h).socket.gotHandshake:
-      return  
-      
-  PAsyncSocket(h).info = SockConnected
-  PAsyncSocket(h).handleConnect(PAsyncSocket(h))
+      return
 
-proc asyncSockHandleRead(h: PObject) =
+  if PAsyncSocket(h).info != SockListening:
+    assert PAsyncSocket(h).info != SockConnecting
+    PAsyncSocket(h).handleRead(PAsyncSocket(h))
+  else:
+    PAsyncSocket(h).handleAccept(PAsyncSocket(h))
+
+proc asyncSockHandleWrite(h: PObject) =
   when defined(ssl):
     if PAsyncSocket(h).socket.isSSL and not
          PAsyncSocket(h).socket.gotHandshake:
       return
-  PAsyncSocket(h).handleRead(PAsyncSocket(h))
+  
+  if PAsyncSocket(h).info == SockConnecting:
+    PAsyncSocket(h).handleConnect(PAsyncSocket(h))
+    # Stop receiving write events
+    PAsyncSocket(h).deleg.mode = fmRead
 
 when defined(ssl):
   proc asyncSockDoHandshake(h: PObject) =
@@ -173,19 +178,27 @@ when defined(ssl):
       else:
         # handshake will set socket's ``sslNoHandshake`` field.
         discard PAsyncSocket(h).socket.handshake()
-
+        
 proc toDelegate(sock: PAsyncSocket): PDelegate =
   result = newDelegate()
   result.deleVal = sock
-  result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
-                        return (PAsyncSocket(h).info, PAsyncSocket(h).socket))
-
-  result.handleConnect = asyncSockHandleConnect
-  
+  result.fd = getFD(sock.socket)
+  # We need this to get write events, just to know when the socket connects.
+  result.mode = fmReadWrite
   result.handleRead = asyncSockHandleRead
-  
-  result.handleAccept = (proc (h: PObject) =
-                           PAsyncSocket(h).handleAccept(PAsyncSocket(h)))
+  result.handleWrite = asyncSockHandleWrite
+  # TODO: Errors?
+  #result.handleError = (proc (h: PObject) = assert(false))
+
+  result.hasDataBuffered =
+    proc (h: PObject): bool {.nimcall.} =
+      return PAsyncSocket(h).socket.hasDataBuffered()
+
+  sock.deleg = result
+  if sock.info notin {SockIdle, SockClosed}:
+    sock.deleg.open = true
+  else:
+    sock.deleg.open = false 
 
   when defined(ssl):
     result.task = asyncSockDoHandshake
@@ -195,22 +208,26 @@ proc connect*(sock: PAsyncSocket, name: string, port = TPort(0),
   ## Begins connecting ``sock`` to ``name``:``port``.
   sock.socket.connectAsync(name, port, af)
   sock.info = SockConnecting
+  sock.deleg.open = true
 
 proc close*(sock: PAsyncSocket) =
   ## Closes ``sock``. Terminates any current connections.
-  sock.info = SockClosed
   sock.socket.close()
+  sock.info = SockClosed
+  sock.deleg.open = false
 
 proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") =
   ## Equivalent to ``sockets.bindAddr``.
   sock.socket.bindAddr(port, address)
   if sock.proto == IPPROTO_UDP:
     sock.info = SockUDPBound
+    sock.deleg.open = true
 
 proc listen*(sock: PAsyncSocket) =
   ## Equivalent to ``sockets.listen``.
   sock.socket.listen()
   sock.info = SockListening
+  sock.deleg.open = true
 
 proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket,
                  address: var string) =
@@ -245,8 +262,11 @@ proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket,
   if c == InvalidSocket: OSError()
   c.setBlocking(false) # TODO: Needs to be tested.
   
+  # deleg.open is set in ``toDelegate``.
+  
   client.socket = c
   client.lineBuffer = ""
+  client.info = SockConnected
 
 proc accept*(server: PAsyncSocket, client: var PAsyncSocket) =
   ## Equivalent to ``sockets.accept``.
@@ -297,9 +317,6 @@ proc isWriteable*(s: PAsyncSocket): bool =
   var writeSock = @[s.socket]
   return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock
 
-proc `userArg=`*(s: PAsyncSocket, val: PObject) =
-  s.userArg = val
-
 converter getSocket*(s: PAsyncSocket): TSocket =
   return s.socket
 
@@ -338,6 +355,48 @@ proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool =
   of RecvFail:
     result = false
 
+proc timeValFromMilliseconds(timeout = 500): TTimeVal =
+  if timeout != -1:
+    var seconds = timeout div 1000
+    result.tv_sec = seconds.int32
+    result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
+
+proc createFdSet(fd: var TFdSet, s: seq[PDelegate], m: var int) =
+  FD_ZERO(fd)
+  for i in items(s): 
+    m = max(m, int(i.fd))
+    FD_SET(i.fd, fd)
+   
+proc pruneSocketSet(s: var seq[PDelegate], fd: var TFdSet) =
+  var i = 0
+  var L = s.len
+  while i < L:
+    if FD_ISSET(s[i].fd, fd) != 0'i32:
+      s[i] = s[L-1]
+      dec(L)
+    else:
+      inc(i)
+  setLen(s, L)
+
+proc select(readfds, writefds, exceptfds: var seq[PDelegate], 
+             timeout = 500): int =
+  var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout)
+  
+  var rd, wr, ex: TFdSet
+  var m = 0
+  createFdSet(rd, readfds, m)
+  createFdSet(wr, writefds, m)
+  createFdSet(ex, exceptfds, m)
+  
+  if timeout != -1:
+    result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), addr(tv)))
+  else:
+    result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), nil))
+
+  pruneSocketSet(readfds, (rd))
+  pruneSocketSet(writefds, (wr))
+  pruneSocketSet(exceptfds, (ex))
+
 proc poll*(d: PDispatcher, timeout: int = 500): bool =
   ## This function checks for events on all the sockets in the `PDispatcher`.
   ## It then proceeds to call the correct event handler.
@@ -354,8 +413,47 @@ proc poll*(d: PDispatcher, timeout: int = 500): bool =
   ## **Note:** Each delegate has a task associated with it. This gets called
   ## after each select() call, if you make timeout ``-1`` the tasks will
   ## only be executed after one or more sockets becomes readable or writeable.
-  
   result = true
+  var readDg, writeDg, errorDg: seq[PDelegate] = @[]
+  var len = d.delegates.len
+  var dc = 0
+  
+  while dc < len:
+    let deleg = d.delegates[dc]
+    if (deleg.mode != fmWrite or deleg.mode != fmAppend) and deleg.open:
+      readDg.add(deleg)
+    if (deleg.mode != fmRead) and deleg.open:
+      writeDg.add(deleg)
+    if deleg.open:
+      errorDg.add(deleg)
+      inc dc
+    else:
+      # File/socket has been closed. Remove it from dispatcher.
+      d.delegates[dc] = d.delegates[len-1]
+      dec len
+  d.delegates.setLen(len)
+  
+  if readDg.len() == 0 and writeDg.len() == 0:
+    ## TODO: Perhaps this shouldn't return if errorDg has something?
+    return False
+  # TODO: Buffering hasDataBuffered!!
+  if select(readDg, writeDg, errorDg, timeout) != 0:
+    for i in 0..len(d.delegates)-1:
+      if i > len(d.delegates)-1: break # One delegate might've been removed.
+      let deleg = d.delegates[i]
+      if (deleg.mode != fmWrite or deleg.mode != fmAppend) and
+          deleg notin readDg:
+        deleg.handleRead(deleg.deleVal)
+      if (deleg.mode != fmRead) and deleg notin writeDg:
+        deleg.handleWrite(deleg.deleVal)
+      if deleg notin errorDg:
+        deleg.handleError(deleg.deleVal)
+  
+  # Execute tasks
+  for i in items(d.delegates):
+    i.task(i.deleVal)
+  
+  discard """result = true
   var readSocks, writeSocks: seq[TSocket] = @[]
   
   var L = d.delegates.len
@@ -410,7 +508,7 @@ proc poll*(d: PDispatcher, timeout: int = 500): bool =
   
   # Execute tasks
   for i in items(d.delegates):
-    i.task(i.deleVal)
+    i.task(i.deleVal)"""
 
 proc len*(disp: PDispatcher): int =
   ## Retrieves the amount of delegates in ``disp``.
diff --git a/lib/pure/sockets.nim b/lib/pure/sockets.nim
index ec1817e72..d0a4c216a 100755
--- a/lib/pure/sockets.nim
+++ b/lib/pure/sockets.nim
@@ -711,7 +711,7 @@ proc connectAsync*(socket: TSocket, name: string, port = TPort(0),
   ## A variant of ``connect`` for non-blocking sockets.
   ##
   ## This procedure will immediatelly return, it will not block until a connection
-  ## is made. It is up to the caller to make sure the connections has been established
+  ## is made. It is up to the caller to make sure the connection has been established
   ## by checking (using ``select``) whether the socket is writeable.
   ##
   ## **Note**: For SSL sockets, the ``handshake`` procedure must be called
@@ -820,6 +820,12 @@ proc pruneSocketSet(s: var seq[TSocket], fd: var TFdSet) =
       inc(i)
   setLen(s, L)
 
+proc hasDataBuffered*(s: TSocket): bool =
+  ## Determines whether a socket has data buffered.
+  result = false
+  if s.isBuffered:
+    result = s.bufLen > 0 and s.currPos != s.bufLen
+
 proc checkBuffer(readfds: var seq[TSocket]): int =
   ## Checks the buffer of each socket in ``readfds`` to see whether there is data.
   ## Removes the sockets from ``readfds`` and returns the count of removed sockets.
@@ -1385,6 +1391,9 @@ proc connect*(socket: TSocket, timeout: int, name: string, port = TPort(0),
 proc isSSL*(socket: TSocket): bool = return socket.isSSL
   ## Determines whether ``socket`` is a SSL socket.
 
+proc getFD*(socket: TSocket): cint = return socket.fd
+  ## Returns the socket's file descriptor
+
 when defined(Windows):
   var wsa: TWSADATA
   if WSAStartup(0x0101'i16, wsa) != 0: OSError()