diff options
-rw-r--r-- | lib/pure/asyncio.nim | 176 | ||||
-rwxr-xr-x | lib/pure/sockets.nim | 11 |
2 files changed, 147 insertions, 40 deletions
diff --git a/lib/pure/asyncio.nim b/lib/pure/asyncio.nim index 113b1d080..c52cd3b94 100644 --- a/lib/pure/asyncio.nim +++ b/lib/pure/asyncio.nim @@ -70,22 +70,24 @@ import sockets, os ## the socket has established a connection to a server socket; from that point ## it can be safely written to. - +when defined(windows): + from winlean import TTimeVal, TFdSet, FD_ZERO, FD_SET, FD_ISSET, select +else: + from posix import TTimeVal, TFdSet, FD_ZERO, FD_SET, FD_ISSET, select type - TDelegate = object + fd: cint deleVal*: PObject handleRead*: proc (h: PObject) {.nimcall.} handleWrite*: proc (h: PObject) {.nimcall.} - handleConnect*: proc (h: PObject) {.nimcall.} - - handleAccept*: proc (h: PObject) {.nimcall.} - getSocket*: proc (h: PObject): tuple[info: TInfo, sock: TSocket] {.nimcall.} - + handleError*: proc (h: PObject) {.nimcall.} + hasDataBuffered*: proc (h: PObject): bool {.nimcall.} + + open*: bool task*: proc (h: PObject) {.nimcall.} - mode*: TMode + mode*: TFileMode PDelegate* = ref TDelegate @@ -106,24 +108,20 @@ type lineBuffer: TaintedString ## Temporary storage for ``recvLine`` sslNeedAccept: bool proto: TProtocol + deleg: PDelegate - TInfo* = enum + TInfo = enum SockIdle, SockConnecting, SockConnected, SockListening, SockClosed, SockUDPBound - - TMode* = enum - MReadable, MWriteable, MReadWrite proc newDelegate*(): PDelegate = ## Creates a new delegate. new(result) result.handleRead = (proc (h: PObject) = nil) result.handleWrite = (proc (h: PObject) = nil) - result.handleConnect = (proc (h: PObject) = nil) - result.handleAccept = (proc (h: PObject) = nil) - result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] = - doAssert(false)) + result.handleError = (proc (h: PObject) = nil) + result.hasDataBuffered = (proc (h: PObject): bool = return false) result.task = (proc (h: PObject) = nil) - result.mode = MReadable + result.mode = fmRead proc newAsyncSocket(): PAsyncSocket = new(result) @@ -144,21 +142,28 @@ proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, if result.socket == InvalidSocket: OSError() result.socket.setBlocking(false) -proc asyncSockHandleConnect(h: PObject) = +proc asyncSockHandleRead(h: PObject) = when defined(ssl): if PAsyncSocket(h).socket.isSSL and not PAsyncSocket(h).socket.gotHandshake: - return - - PAsyncSocket(h).info = SockConnected - PAsyncSocket(h).handleConnect(PAsyncSocket(h)) + return -proc asyncSockHandleRead(h: PObject) = + if PAsyncSocket(h).info != SockListening: + assert PAsyncSocket(h).info != SockConnecting + PAsyncSocket(h).handleRead(PAsyncSocket(h)) + else: + PAsyncSocket(h).handleAccept(PAsyncSocket(h)) + +proc asyncSockHandleWrite(h: PObject) = when defined(ssl): if PAsyncSocket(h).socket.isSSL and not PAsyncSocket(h).socket.gotHandshake: return - PAsyncSocket(h).handleRead(PAsyncSocket(h)) + + if PAsyncSocket(h).info == SockConnecting: + PAsyncSocket(h).handleConnect(PAsyncSocket(h)) + # Stop receiving write events + PAsyncSocket(h).deleg.mode = fmRead when defined(ssl): proc asyncSockDoHandshake(h: PObject) = @@ -173,19 +178,27 @@ when defined(ssl): else: # handshake will set socket's ``sslNoHandshake`` field. discard PAsyncSocket(h).socket.handshake() - + proc toDelegate(sock: PAsyncSocket): PDelegate = result = newDelegate() result.deleVal = sock - result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] = - return (PAsyncSocket(h).info, PAsyncSocket(h).socket)) - - result.handleConnect = asyncSockHandleConnect - + result.fd = getFD(sock.socket) + # We need this to get write events, just to know when the socket connects. + result.mode = fmReadWrite result.handleRead = asyncSockHandleRead - - result.handleAccept = (proc (h: PObject) = - PAsyncSocket(h).handleAccept(PAsyncSocket(h))) + result.handleWrite = asyncSockHandleWrite + # TODO: Errors? + #result.handleError = (proc (h: PObject) = assert(false)) + + result.hasDataBuffered = + proc (h: PObject): bool {.nimcall.} = + return PAsyncSocket(h).socket.hasDataBuffered() + + sock.deleg = result + if sock.info notin {SockIdle, SockClosed}: + sock.deleg.open = true + else: + sock.deleg.open = false when defined(ssl): result.task = asyncSockDoHandshake @@ -195,22 +208,26 @@ proc connect*(sock: PAsyncSocket, name: string, port = TPort(0), ## Begins connecting ``sock`` to ``name``:``port``. sock.socket.connectAsync(name, port, af) sock.info = SockConnecting + sock.deleg.open = true proc close*(sock: PAsyncSocket) = ## Closes ``sock``. Terminates any current connections. - sock.info = SockClosed sock.socket.close() + sock.info = SockClosed + sock.deleg.open = false proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") = ## Equivalent to ``sockets.bindAddr``. sock.socket.bindAddr(port, address) if sock.proto == IPPROTO_UDP: sock.info = SockUDPBound + sock.deleg.open = true proc listen*(sock: PAsyncSocket) = ## Equivalent to ``sockets.listen``. sock.socket.listen() sock.info = SockListening + sock.deleg.open = true proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket, address: var string) = @@ -245,8 +262,11 @@ proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket, if c == InvalidSocket: OSError() c.setBlocking(false) # TODO: Needs to be tested. + # deleg.open is set in ``toDelegate``. + client.socket = c client.lineBuffer = "" + client.info = SockConnected proc accept*(server: PAsyncSocket, client: var PAsyncSocket) = ## Equivalent to ``sockets.accept``. @@ -297,9 +317,6 @@ proc isWriteable*(s: PAsyncSocket): bool = var writeSock = @[s.socket] return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock -proc `userArg=`*(s: PAsyncSocket, val: PObject) = - s.userArg = val - converter getSocket*(s: PAsyncSocket): TSocket = return s.socket @@ -338,6 +355,48 @@ proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool = of RecvFail: result = false +proc timeValFromMilliseconds(timeout = 500): TTimeVal = + if timeout != -1: + var seconds = timeout div 1000 + result.tv_sec = seconds.int32 + result.tv_usec = ((timeout - seconds * 1000) * 1000).int32 + +proc createFdSet(fd: var TFdSet, s: seq[PDelegate], m: var int) = + FD_ZERO(fd) + for i in items(s): + m = max(m, int(i.fd)) + FD_SET(i.fd, fd) + +proc pruneSocketSet(s: var seq[PDelegate], fd: var TFdSet) = + var i = 0 + var L = s.len + while i < L: + if FD_ISSET(s[i].fd, fd) != 0'i32: + s[i] = s[L-1] + dec(L) + else: + inc(i) + setLen(s, L) + +proc select(readfds, writefds, exceptfds: var seq[PDelegate], + timeout = 500): int = + var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout) + + var rd, wr, ex: TFdSet + var m = 0 + createFdSet(rd, readfds, m) + createFdSet(wr, writefds, m) + createFdSet(ex, exceptfds, m) + + if timeout != -1: + result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), addr(tv))) + else: + result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), nil)) + + pruneSocketSet(readfds, (rd)) + pruneSocketSet(writefds, (wr)) + pruneSocketSet(exceptfds, (ex)) + proc poll*(d: PDispatcher, timeout: int = 500): bool = ## This function checks for events on all the sockets in the `PDispatcher`. ## It then proceeds to call the correct event handler. @@ -354,8 +413,47 @@ proc poll*(d: PDispatcher, timeout: int = 500): bool = ## **Note:** Each delegate has a task associated with it. This gets called ## after each select() call, if you make timeout ``-1`` the tasks will ## only be executed after one or more sockets becomes readable or writeable. - result = true + var readDg, writeDg, errorDg: seq[PDelegate] = @[] + var len = d.delegates.len + var dc = 0 + + while dc < len: + let deleg = d.delegates[dc] + if (deleg.mode != fmWrite or deleg.mode != fmAppend) and deleg.open: + readDg.add(deleg) + if (deleg.mode != fmRead) and deleg.open: + writeDg.add(deleg) + if deleg.open: + errorDg.add(deleg) + inc dc + else: + # File/socket has been closed. Remove it from dispatcher. + d.delegates[dc] = d.delegates[len-1] + dec len + d.delegates.setLen(len) + + if readDg.len() == 0 and writeDg.len() == 0: + ## TODO: Perhaps this shouldn't return if errorDg has something? + return False + # TODO: Buffering hasDataBuffered!! + if select(readDg, writeDg, errorDg, timeout) != 0: + for i in 0..len(d.delegates)-1: + if i > len(d.delegates)-1: break # One delegate might've been removed. + let deleg = d.delegates[i] + if (deleg.mode != fmWrite or deleg.mode != fmAppend) and + deleg notin readDg: + deleg.handleRead(deleg.deleVal) + if (deleg.mode != fmRead) and deleg notin writeDg: + deleg.handleWrite(deleg.deleVal) + if deleg notin errorDg: + deleg.handleError(deleg.deleVal) + + # Execute tasks + for i in items(d.delegates): + i.task(i.deleVal) + + discard """result = true var readSocks, writeSocks: seq[TSocket] = @[] var L = d.delegates.len @@ -410,7 +508,7 @@ proc poll*(d: PDispatcher, timeout: int = 500): bool = # Execute tasks for i in items(d.delegates): - i.task(i.deleVal) + i.task(i.deleVal)""" proc len*(disp: PDispatcher): int = ## Retrieves the amount of delegates in ``disp``. diff --git a/lib/pure/sockets.nim b/lib/pure/sockets.nim index ec1817e72..d0a4c216a 100755 --- a/lib/pure/sockets.nim +++ b/lib/pure/sockets.nim @@ -711,7 +711,7 @@ proc connectAsync*(socket: TSocket, name: string, port = TPort(0), ## A variant of ``connect`` for non-blocking sockets. ## ## This procedure will immediatelly return, it will not block until a connection - ## is made. It is up to the caller to make sure the connections has been established + ## is made. It is up to the caller to make sure the connection has been established ## by checking (using ``select``) whether the socket is writeable. ## ## **Note**: For SSL sockets, the ``handshake`` procedure must be called @@ -820,6 +820,12 @@ proc pruneSocketSet(s: var seq[TSocket], fd: var TFdSet) = inc(i) setLen(s, L) +proc hasDataBuffered*(s: TSocket): bool = + ## Determines whether a socket has data buffered. + result = false + if s.isBuffered: + result = s.bufLen > 0 and s.currPos != s.bufLen + proc checkBuffer(readfds: var seq[TSocket]): int = ## Checks the buffer of each socket in ``readfds`` to see whether there is data. ## Removes the sockets from ``readfds`` and returns the count of removed sockets. @@ -1385,6 +1391,9 @@ proc connect*(socket: TSocket, timeout: int, name: string, port = TPort(0), proc isSSL*(socket: TSocket): bool = return socket.isSSL ## Determines whether ``socket`` is a SSL socket. +proc getFD*(socket: TSocket): cint = return socket.fd + ## Returns the socket's file descriptor + when defined(Windows): var wsa: TWSADATA if WSAStartup(0x0101'i16, wsa) != 0: OSError() |