diff options
Diffstat (limited to 'lib/pure/asyncio.nim')
-rw-r--r-- | lib/pure/asyncio.nim | 643 |
1 files changed, 0 insertions, 643 deletions
diff --git a/lib/pure/asyncio.nim b/lib/pure/asyncio.nim deleted file mode 100644 index be375a1a1..000000000 --- a/lib/pure/asyncio.nim +++ /dev/null @@ -1,643 +0,0 @@ -# -# -# Nimrod's Runtime Library -# (c) Copyright 2012 Andreas Rumpf, Dominik Picheta -# See the file "copying.txt", included in this -# distribution, for details about the copyright. -# - -import sockets, os - -## This module implements an asynchronous event loop for sockets. -## It is akin to Python's asyncore module. Many modules that use sockets -## have an implementation for this module, those modules should all have a -## ``register`` function which you should use to add the desired objects to a -## dispatcher which you created so -## that you can receive the events associated with that module's object. -## -## Once everything is registered in a dispatcher, you need to call the ``poll`` -## function in a while loop. -## -## **Note:** Most modules have tasks which need to be ran regularly, this is -## why you should not call ``poll`` with a infinite timeout, or even a -## very long one. In most cases the default timeout is fine. -## -## **Note:** This module currently only supports select(), this is limited by -## FD_SETSIZE, which is usually 1024. So you may only be able to use 1024 -## sockets at a time. -## -## Most (if not all) modules that use asyncio provide a userArg which is passed -## on with the events. The type that you set userArg to must be inheriting from -## TObject! -## -## **Note:** If you want to provide async ability to your module please do not -## use the ``TDelegate`` object, instead use ``PAsyncSocket``. It is possible -## that in the future this type's fields will not be exported therefore breaking -## your code. -## -## **Warning:** The API of this module is unstable, and therefore is subject -## to change. -## -## Asynchronous sockets -## ==================== -## -## For most purposes you do not need to worry about the ``TDelegate`` type. The -## ``PAsyncSocket`` is what you are after. It's a reference to the ``TAsyncSocket`` -## object. This object defines events which you should overwrite by your own -## procedures. -## -## For server sockets the only event you need to worry about is the ``handleAccept`` -## event, in your handleAccept proc you should call ``accept`` on the server -## socket which will give you the client which is connecting. You should then -## set any events that you want to use on that client and add it to your dispatcher -## using the ``register`` procedure. -## -## An example ``handleAccept`` follows: -## -## .. code-block:: nimrod -## -## var disp: PDispatcher = newDispatcher() -## ... -## proc handleAccept(s: PAsyncSocket) = -## echo("Accepted client.") -## var client: PAsyncSocket -## new(client) -## s.accept(client) -## client.handleRead = ... -## disp.register(client) -## ... -## -## For client sockets you should only be interested in the ``handleRead`` and -## ``handleConnect`` events. The former gets called whenever the socket has -## received messages and can be read from and the latter gets called whenever -## the socket has established a connection to a server socket; from that point -## it can be safely written to. -## -## Getting a blocking client from a PAsyncSocket -## ============================================= -## -## If you need a asynchronous server socket but you wish to process the clients -## synchronously then you can use the ``getSocket`` converter to get a TSocket -## object from the PAsyncSocket object, this can then be combined with ``accept`` -## like so: -## -## .. code-block:: nimrod -## -## proc handleAccept(s: PAsyncSocket) = -## var client: TSocket -## getSocket(s).accept(client) - -when defined(windows): - from winlean import TTimeVal, TFdSet, FD_ZERO, FD_SET, FD_ISSET, select -else: - from posix import TTimeVal, TFdSet, FD_ZERO, FD_SET, FD_ISSET, select - -type - TDelegate* = object - fd*: cint - deleVal*: PObject - - handleRead*: proc (h: PObject) {.nimcall.} - handleWrite*: proc (h: PObject) {.nimcall.} - handleError*: proc (h: PObject) {.nimcall.} - hasDataBuffered*: proc (h: PObject): bool {.nimcall.} - - open*: bool - task*: proc (h: PObject) {.nimcall.} - mode*: TFileMode - - PDelegate* = ref TDelegate - - PDispatcher* = ref TDispatcher - TDispatcher = object - delegates: seq[PDelegate] - - PAsyncSocket* = ref TAsyncSocket - TAsyncSocket* = object of TObject - socket: TSocket - info: TInfo - - handleRead*: proc (s: PAsyncSocket) {.closure.} - handleWrite: proc (s: PAsyncSocket) {.closure.} - handleConnect*: proc (s: PAsyncSocket) {.closure.} - - handleAccept*: proc (s: PAsyncSocket) {.closure.} - - handleTask*: proc (s: PAsyncSocket) {.closure.} - - lineBuffer: TaintedString ## Temporary storage for ``recvLine`` - sendBuffer: string ## Temporary storage for ``send`` - sslNeedAccept: bool - proto: TProtocol - deleg: PDelegate - - TInfo* = enum - SockIdle, SockConnecting, SockConnected, SockListening, SockClosed, - SockUDPBound - -proc newDelegate*(): PDelegate = - ## Creates a new delegate. - new(result) - result.handleRead = (proc (h: PObject) = nil) - result.handleWrite = (proc (h: PObject) = nil) - result.handleError = (proc (h: PObject) = nil) - result.hasDataBuffered = (proc (h: PObject): bool = return false) - result.task = (proc (h: PObject) = nil) - result.mode = fmRead - -proc newAsyncSocket(): PAsyncSocket = - new(result) - result.info = SockIdle - - result.handleRead = (proc (s: PAsyncSocket) = nil) - result.handleWrite = nil - result.handleConnect = (proc (s: PAsyncSocket) = nil) - result.handleAccept = (proc (s: PAsyncSocket) = nil) - result.handleTask = (proc (s: PAsyncSocket) = nil) - - result.lineBuffer = "".TaintedString - result.sendBuffer = "" - -proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, - protocol: TProtocol = IPPROTO_TCP, - buffered = true): PAsyncSocket = - ## Initialises an AsyncSocket object. If a socket cannot be initialised - ## EOS is raised. - result = newAsyncSocket() - result.socket = socket(domain, typ, protocol, buffered) - result.proto = protocol - if result.socket == InvalidSocket: OSError() - result.socket.setBlocking(false) - -proc toAsyncSocket*(sock: TSocket, state: TInfo = SockConnected): PAsyncSocket = - ## Wraps an already initialized ``TSocket`` into a PAsyncSocket. - ## This is useful if you want to use an already connected TSocket as an - ## asynchronous PAsyncSocket in asyncio's event loop. - ## - ## ``state`` may be overriden, i.e. if ``sock`` is not connected it should be - ## adjusted properly. By default it will be assumed that the socket is - ## connected. Please note this is only applicable to TCP client sockets, if - ## ``sock`` is a different type of socket ``state`` needs to be adjusted!!! - ## - ## ================ ================================================================ - ## Value Meaning - ## ================ ================================================================ - ## SockIdle Socket has only just been initialised, not connected or closed. - ## SockConnected Socket is connected to a server. - ## SockConnecting Socket is in the process of connecting to a server. - ## SockListening Socket is a server socket and is listening for connections. - ## SockClosed Socket has been closed. - ## SockUDPBound Socket is a UDP socket which is listening for data. - ## ================ ================================================================ - ## - ## **Warning**: If ``state`` is set incorrectly the resulting ``PAsyncSocket`` - ## object may not work properly. - ## - ## **Note**: This will set ``sock`` to be non-blocking. - result = newAsyncSocket() - result.socket = sock - result.proto = if state == SockUDPBound: IPPROTO_UDP else: IPPROTO_TCP - result.socket.setBlocking(false) - result.info = state - -proc asyncSockHandleRead(h: PObject) = - when defined(ssl): - if PAsyncSocket(h).socket.isSSL and not - PAsyncSocket(h).socket.gotHandshake: - return - - if PAsyncSocket(h).info != SockListening: - if PAsyncSocket(h).info != SockConnecting: - PAsyncSocket(h).handleRead(PAsyncSocket(h)) - else: - PAsyncSocket(h).handleAccept(PAsyncSocket(h)) - -proc asyncSockHandleWrite(h: PObject) = - when defined(ssl): - if PAsyncSocket(h).socket.isSSL and not - PAsyncSocket(h).socket.gotHandshake: - return - - if PAsyncSocket(h).info == SockConnecting: - PAsyncSocket(h).handleConnect(PAsyncSocket(h)) - PAsyncSocket(h).info = SockConnected - # Stop receiving write events if there is no handleWrite event. - if PAsyncSocket(h).handleWrite == nil: - PAsyncSocket(h).deleg.mode = fmRead - else: - PAsyncSocket(h).deleg.mode = fmReadWrite - else: - if PAsyncSocket(h).sendBuffer != "": - let sock = PAsyncSocket(h) - let bytesSent = sock.socket.sendAsync(sock.sendBuffer) - assert bytesSent > 0 - if bytesSent != sock.sendBuffer.len: - sock.sendBuffer = sock.sendBuffer[bytesSent .. -1] - elif bytesSent == sock.sendBuffer.len: - sock.sendBuffer = "" - - if PAsyncSocket(h).handleWrite != nil: - PAsyncSocket(h).handleWrite(PAsyncSocket(h)) - else: - if PAsyncSocket(h).handleWrite != nil: - PAsyncSocket(h).handleWrite(PAsyncSocket(h)) - else: - PAsyncSocket(h).deleg.mode = fmRead - -when defined(ssl): - proc asyncSockDoHandshake(h: PObject) = - if PAsyncSocket(h).socket.isSSL and not - PAsyncSocket(h).socket.gotHandshake: - if PAsyncSocket(h).sslNeedAccept: - var d = "" - let ret = PAsyncSocket(h).socket.acceptAddrSSL(PAsyncSocket(h).socket, d) - assert ret != AcceptNoClient - if ret == AcceptSuccess: - PAsyncSocket(h).info = SockConnected - else: - # handshake will set socket's ``sslNoHandshake`` field. - discard PAsyncSocket(h).socket.handshake() - - -proc asyncSockTask(h: PObject) = - when defined(ssl): - h.asyncSockDoHandshake() - - PAsyncSocket(h).handleTask(PAsyncSocket(h)) - -proc toDelegate(sock: PAsyncSocket): PDelegate = - result = newDelegate() - result.deleVal = sock - result.fd = getFD(sock.socket) - # We need this to get write events, just to know when the socket connects. - result.mode = fmReadWrite - result.handleRead = asyncSockHandleRead - result.handleWrite = asyncSockHandleWrite - result.task = asyncSockTask - # TODO: Errors? - #result.handleError = (proc (h: PObject) = assert(false)) - - result.hasDataBuffered = - proc (h: PObject): bool {.nimcall.} = - return PAsyncSocket(h).socket.hasDataBuffered() - - sock.deleg = result - if sock.info notin {SockIdle, SockClosed}: - sock.deleg.open = true - else: - sock.deleg.open = false - -proc connect*(sock: PAsyncSocket, name: string, port = TPort(0), - af: TDomain = AF_INET) = - ## Begins connecting ``sock`` to ``name``:``port``. - sock.socket.connectAsync(name, port, af) - sock.info = SockConnecting - if sock.deleg != nil: - sock.deleg.open = true - -proc close*(sock: PAsyncSocket) = - ## Closes ``sock``. Terminates any current connections. - sock.socket.close() - sock.info = SockClosed - if sock.deleg != nil: - sock.deleg.open = false - -proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") = - ## Equivalent to ``sockets.bindAddr``. - sock.socket.bindAddr(port, address) - if sock.proto == IPPROTO_UDP: - sock.info = SockUDPBound - if sock.deleg != nil: - sock.deleg.open = true - -proc listen*(sock: PAsyncSocket) = - ## Equivalent to ``sockets.listen``. - sock.socket.listen() - sock.info = SockListening - if sock.deleg != nil: - sock.deleg.open = true - -proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket, - address: var string) = - ## Equivalent to ``sockets.acceptAddr``. This procedure should be called in - ## a ``handleAccept`` event handler **only** once. - ## - ## **Note**: ``client`` needs to be initialised. - assert(client != nil) - client = newAsyncSocket() - var c: TSocket - new(c) - when defined(ssl): - if server.socket.isSSL: - var ret = server.socket.acceptAddrSSL(c, address) - # The following shouldn't happen because when this function is called - # it is guaranteed that there is a client waiting. - # (This should be called in handleAccept) - assert(ret != AcceptNoClient) - if ret == AcceptNoHandshake: - client.sslNeedAccept = true - else: - client.sslNeedAccept = false - client.info = SockConnected - else: - server.socket.acceptAddr(c, address) - client.sslNeedAccept = false - client.info = SockConnected - else: - server.socket.acceptAddr(c, address) - client.sslNeedAccept = false - client.info = SockConnected - - if c == InvalidSocket: OSError() - c.setBlocking(false) # TODO: Needs to be tested. - - # deleg.open is set in ``toDelegate``. - - client.socket = c - client.lineBuffer = "".TaintedString - client.sendBuffer = "" - client.info = SockConnected - -proc accept*(server: PAsyncSocket, client: var PAsyncSocket) = - ## Equivalent to ``sockets.accept``. - var dummyAddr = "" - server.acceptAddr(client, dummyAddr) - -proc acceptAddr*(server: PAsyncSocket): tuple[sock: PAsyncSocket, - address: string] {.deprecated.} = - ## Equivalent to ``sockets.acceptAddr``. - ## - ## **Deprecated since version 0.9.0:** Please use the function above. - var client = newAsyncSocket() - var address: string = "" - acceptAddr(server, client, address) - return (client, address) - -proc accept*(server: PAsyncSocket): PAsyncSocket {.deprecated.} = - ## Equivalent to ``sockets.accept``. - ## - ## **Deprecated since version 0.9.0:** Please use the function above. - new(result) - var address = "" - server.acceptAddr(result, address) - -proc newDispatcher*(): PDispatcher = - new(result) - result.delegates = @[] - -proc register*(d: PDispatcher, deleg: PDelegate) = - ## Registers delegate ``deleg`` with dispatcher ``d``. - d.delegates.add(deleg) - -proc register*(d: PDispatcher, sock: PAsyncSocket): PDelegate {.discardable.} = - ## Registers async socket ``sock`` with dispatcher ``d``. - result = sock.toDelegate() - d.register(result) - -proc unregister*(d: PDispatcher, deleg: PDelegate) = - ## Unregisters deleg ``deleg`` from dispatcher ``d``. - for i in 0..len(d.delegates)-1: - if d.delegates[i] == deleg: - d.delegates.del(i) - return - raise newException(EInvalidIndex, "Could not find delegate.") - -proc isWriteable*(s: PAsyncSocket): bool = - ## Determines whether socket ``s`` is ready to be written to. - var writeSock = @[s.socket] - return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock - -converter getSocket*(s: PAsyncSocket): TSocket = - return s.socket - -proc isConnected*(s: PAsyncSocket): bool = - ## Determines whether ``s`` is connected. - return s.info == SockConnected -proc isListening*(s: PAsyncSocket): bool = - ## Determines whether ``s`` is listening for incoming connections. - return s.info == SockListening -proc isConnecting*(s: PAsyncSocket): bool = - ## Determines whether ``s`` is connecting. - return s.info == SockConnecting - -proc setHandleWrite*(s: PAsyncSocket, - handleWrite: proc (s: PAsyncSocket) {.closure.}) = - ## Setter for the ``handleWrite`` event. - ## - ## To remove this event you should use the ``delHandleWrite`` function. - ## It is advised to use that function instead of just setting the event to - ## ``proc (s: PAsyncSocket) = nil`` as that would mean that that function - ## would be called constantly. - s.deleg.mode = fmReadWrite - s.handleWrite = handleWrite - -proc delHandleWrite*(s: PAsyncSocket) = - ## Removes the ``handleWrite`` event handler on ``s``. - s.handleWrite = nil - -proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool = - ## Behaves similar to ``sockets.recvLine``, however it handles non-blocking - ## sockets properly. This function guarantees that ``line`` is a full line, - ## if this function can only retrieve some data; it will save this data and - ## add it to the result when a full line is retrieved. - ## - ## Unlike ``sockets.recvLine`` this function will raise an EOS or ESSL - ## exception if an error occurs. - setLen(line.string, 0) - var dataReceived = "".TaintedString - var ret = s.socket.recvLineAsync(dataReceived) - case ret - of RecvFullLine: - if s.lineBuffer.len > 0: - string(line).add(s.lineBuffer.string) - setLen(s.lineBuffer.string, 0) - string(line).add(dataReceived.string) - if string(line) == "": - line = "\c\L".TaintedString - result = true - of RecvPartialLine: - string(s.lineBuffer).add(dataReceived.string) - result = false - of RecvDisconnected: - result = true - of RecvFail: - s.SocketError(async = true) - result = false - -proc send*(sock: PAsyncSocket, data: string) = - ## Sends ``data`` to socket ``sock``. This is basically a nicer implementation - ## of ``sockets.sendAsync``. - ## - ## If ``data`` cannot be sent immediately it will be buffered and sent - ## when ``sock`` becomes writeable (during the ``handleWrite`` event). - ## It's possible that only a part of ``data`` will be sent immediately, while - ## the rest of it will be buffered and sent later. - if sock.sendBuffer.len != 0: - sock.sendBuffer.add(data) - return - let bytesSent = sock.socket.sendAsync(data) - assert bytesSent >= 0 - if bytesSent == 0: - sock.sendBuffer.add(data) - sock.deleg.mode = fmReadWrite - elif bytesSent != data.len: - sock.sendBuffer.add(data[bytesSent .. -1]) - sock.deleg.mode = fmReadWrite - -proc timeValFromMilliseconds(timeout = 500): TTimeVal = - if timeout != -1: - var seconds = timeout div 1000 - result.tv_sec = seconds.int32 - result.tv_usec = ((timeout - seconds * 1000) * 1000).int32 - -proc createFdSet(fd: var TFdSet, s: seq[PDelegate], m: var int) = - FD_ZERO(fd) - for i in items(s): - m = max(m, int(i.fd)) - FD_SET(i.fd, fd) - -proc pruneSocketSet(s: var seq[PDelegate], fd: var TFdSet) = - var i = 0 - var L = s.len - while i < L: - if FD_ISSET(s[i].fd, fd) != 0'i32: - s[i] = s[L-1] - dec(L) - else: - inc(i) - setLen(s, L) - -proc select(readfds, writefds, exceptfds: var seq[PDelegate], - timeout = 500): int = - var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout) - - var rd, wr, ex: TFdSet - var m = 0 - createFdSet(rd, readfds, m) - createFdSet(wr, writefds, m) - createFdSet(ex, exceptfds, m) - - if timeout != -1: - result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), addr(tv))) - else: - result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), nil)) - - pruneSocketSet(readfds, (rd)) - pruneSocketSet(writefds, (wr)) - pruneSocketSet(exceptfds, (ex)) - -proc poll*(d: PDispatcher, timeout: int = 500): bool = - ## This function checks for events on all the delegates in the `PDispatcher`. - ## It then proceeds to call the correct event handler. - ## - ## This function returns ``True`` if there are file descriptors that are still - ## open, otherwise ``False``. File descriptors that have been - ## closed are immediately removed from the dispatcher automatically. - ## - ## **Note:** Each delegate has a task associated with it. This gets called - ## after each select() call, if you set timeout to ``-1`` the tasks will - ## only be executed after one or more file descriptors becomes readable or - ## writeable. - result = true - var readDg, writeDg, errorDg: seq[PDelegate] = @[] - var len = d.delegates.len - var dc = 0 - - while dc < len: - let deleg = d.delegates[dc] - if (deleg.mode != fmWrite or deleg.mode != fmAppend) and deleg.open: - readDg.add(deleg) - if (deleg.mode != fmRead) and deleg.open: - writeDg.add(deleg) - if deleg.open: - errorDg.add(deleg) - inc dc - else: - # File/socket has been closed. Remove it from dispatcher. - d.delegates[dc] = d.delegates[len-1] - dec len - - d.delegates.setLen(len) - - var hasDataBufferedCount = 0 - for d in d.delegates: - if d.hasDataBuffered(d.deleVal): - hasDataBufferedCount.inc() - d.handleRead(d.deleVal) - if hasDataBufferedCount > 0: return True - - if readDg.len() == 0 and writeDg.len() == 0: - ## TODO: Perhaps this shouldn't return if errorDg has something? - return False - - if select(readDg, writeDg, errorDg, timeout) != 0: - for i in 0..len(d.delegates)-1: - if i > len(d.delegates)-1: break # One delegate might've been removed. - let deleg = d.delegates[i] - if not deleg.open: continue # This delegate might've been closed. - if (deleg.mode != fmWrite or deleg.mode != fmAppend) and - deleg notin readDg: - deleg.handleRead(deleg.deleVal) - if (deleg.mode != fmRead) and deleg notin writeDg: - deleg.handleWrite(deleg.deleVal) - if deleg notin errorDg: - deleg.handleError(deleg.deleVal) - - # Execute tasks - for i in items(d.delegates): - i.task(i.deleVal) - -proc len*(disp: PDispatcher): int = - ## Retrieves the amount of delegates in ``disp``. - return disp.delegates.len - -when isMainModule: - - proc testConnect(s: PAsyncSocket, no: int) = - echo("Connected! " & $no) - - proc testRead(s: PAsyncSocket, no: int) = - echo("Reading! " & $no) - var data = "" - if not s.recvLine(data): - OSError() - if data == "": - echo("Closing connection. " & $no) - s.close() - echo(data) - echo("Finished reading! " & $no) - - proc testAccept(s: PAsyncSocket, disp: PDispatcher, no: int) = - echo("Accepting client! " & $no) - var client: PAsyncSocket - new(client) - var address = "" - s.acceptAddr(client, address) - echo("Accepted ", address) - client.handleRead = - proc (s: PAsyncSocket) = - testRead(s, 2) - disp.register(client) - - var d = newDispatcher() - - var s = AsyncSocket() - s.connect("amber.tenthbit.net", TPort(6667)) - s.handleConnect = - proc (s: PAsyncSocket) = - testConnect(s, 1) - s.handleRead = - proc (s: PAsyncSocket) = - testRead(s, 1) - d.register(s) - - var server = AsyncSocket() - server.handleAccept = - proc (s: PAsyncSocket) = - testAccept(s, d, 78) - server.bindAddr(TPort(5555)) - server.listen() - d.register(server) - - while d.poll(-1): nil - |