diff options
author | Araq <rumpf_a@web.de> | 2012-01-28 23:24:31 +0100 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2012-01-28 23:24:31 +0100 |
commit | 0f18ab96911cfdae59061dd8afba26c15631d2b5 (patch) | |
tree | 814d9dacacf9cf579194f1053af9c1288bf957c3 /lib/pure/asyncio.nim | |
parent | 8d19a93f1a2fe33373ea32367f5f1828f7d913cc (diff) | |
parent | d2a8a633f6232de24c08d115f68cbb725fa18755 (diff) | |
download | Nim-0f18ab96911cfdae59061dd8afba26c15631d2b5.tar.gz |
removed conflict in system.nim
Diffstat (limited to 'lib/pure/asyncio.nim')
-rw-r--r-- | lib/pure/asyncio.nim | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/lib/pure/asyncio.nim b/lib/pure/asyncio.nim new file mode 100644 index 000000000..1c366e4d9 --- /dev/null +++ b/lib/pure/asyncio.nim @@ -0,0 +1,325 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2012 Andreas Rumpf, Dominik Picheta +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +import sockets, os + +## This module implements an asynchronous event loop for sockets. +## It is akin to Python's asyncore module. Many modules that use sockets +## have an implementation for this module, those modules should all have a +## ``register`` function which you should use to add it to a dispatcher so +## that you can receive the events associated with that module. +## +## Once everything is registered in a dispatcher, you need to call the ``poll`` +## function in a while loop. +## +## **Note:** Most modules have tasks which need to be ran regularly, this is +## why you should not call ``poll`` with a infinite timeout, or even a +## very long one. In most cases the default timeout is fine. +## +## **Note:** This module currently only supports select(), this is limited by +## FD_SETSIZE, which is usually 1024. So you may only be able to use 1024 +## sockets at a time. +## +## Most (if not all) modules that use asyncio provide a userArg which is passed +## on with the events. The type that you set userArg to must be inheriting from +## TObject! + +type + TDelegate = object + deleVal*: PObject + + handleRead*: proc (h: PObject) + handleWrite*: proc (h: PObject) + handleConnect*: proc (h: PObject) + + handleAccept*: proc (h: PObject) + getSocket*: proc (h: PObject): tuple[info: TInfo, sock: TSocket] + + task*: proc (h: PObject) + mode*: TMode + + PDelegate* = ref TDelegate + + PDispatcher* = ref TDispatcher + TDispatcher = object + delegates: seq[PDelegate] + + PAsyncSocket* = ref TAsyncSocket + TAsyncSocket = object of TObject + socket: TSocket + info: TInfo + + userArg: PObject + + handleRead*: proc (s: PAsyncSocket, arg: PObject) + handleConnect*: proc (s: PAsyncSocket, arg: PObject) + + handleAccept*: proc (s: PAsyncSocket, arg: PObject) + + TInfo* = enum + SockIdle, SockConnecting, SockConnected, SockListening, SockClosed + + TMode* = enum + MReadable, MWriteable, MReadWrite + +proc newDelegate*(): PDelegate = + ## Creates a new delegate. + new(result) + result.handleRead = (proc (h: PObject) = nil) + result.handleWrite = (proc (h: PObject) = nil) + result.handleConnect = (proc (h: PObject) = nil) + result.handleAccept = (proc (h: PObject) = nil) + result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] = + doAssert(false)) + result.task = (proc (h: PObject) = nil) + result.mode = MReadable + +proc newAsyncSocket(userArg: PObject = nil): PAsyncSocket = + new(result) + result.info = SockIdle + result.userArg = userArg + + result.handleRead = (proc (s: PAsyncSocket, arg: PObject) = nil) + result.handleConnect = (proc (s: PAsyncSocket, arg: PObject) = nil) + result.handleAccept = (proc (s: PAsyncSocket, arg: PObject) = nil) + +proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP, + userArg: PObject = nil): PAsyncSocket = + result = newAsyncSocket(userArg) + result.socket = socket(domain, typ, protocol) + if result.socket == InvalidSocket: OSError() + result.socket.setBlocking(false) + +proc toDelegate(sock: PAsyncSocket): PDelegate = + result = newDelegate() + result.deleVal = sock + result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] = + return (PAsyncSocket(h).info, PAsyncSocket(h).socket)) + + result.handleConnect = (proc (h: PObject) = + PAsyncSocket(h).info = SockConnected + PAsyncSocket(h).handleConnect(PAsyncSocket(h), + PAsyncSocket(h).userArg)) + result.handleRead = (proc (h: PObject) = + PAsyncSocket(h).handleRead(PAsyncSocket(h), + PAsyncSocket(h).userArg)) + result.handleAccept = (proc (h: PObject) = + PAsyncSocket(h).handleAccept(PAsyncSocket(h), + PAsyncSocket(h).userArg)) + +proc connect*(sock: PAsyncSocket, name: string, port = TPort(0), + af: TDomain = AF_INET) = + ## Begins connecting ``sock`` to ``name``:``port``. + sock.socket.connectAsync(name, port, af) + sock.info = SockConnecting + +proc close*(sock: PAsyncSocket) = + ## Closes ``sock``. Terminates any current connections. + sock.info = SockClosed + sock.socket.close() + +proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") = + ## Equivalent to ``sockets.bindAddr``. + sock.socket.bindAddr(port, address) + +proc listen*(sock: PAsyncSocket) = + ## Equivalent to ``sockets.listen``. + sock.socket.listen() + sock.info = SockListening + +proc acceptAddr*(server: PAsyncSocket): tuple[sock: PAsyncSocket, + address: string] = + ## Equivalent to ``sockets.acceptAddr``. + var (client, a) = server.socket.acceptAddr() + if client == InvalidSocket: OSError() + client.setBlocking(false) # TODO: Needs to be tested. + + var aSock: PAsyncSocket = newAsyncSocket() + aSock.socket = client + aSock.info = SockConnected + + return (aSock, a) + +proc accept*(server: PAsyncSocket): PAsyncSocket = + ## Equivalent to ``sockets.accept``. + var (client, a) = server.acceptAddr() + return client + +proc newDispatcher*(): PDispatcher = + new(result) + result.delegates = @[] + +proc register*(d: PDispatcher, deleg: PDelegate) = + ## Registers delegate ``deleg`` with dispatcher ``d``. + d.delegates.add(deleg) + +proc register*(d: PDispatcher, sock: PAsyncSocket): PDelegate {.discardable.} = + ## Registers async socket ``sock`` with dispatcher ``d``. + result = sock.toDelegate() + d.register(result) + +proc unregister*(d: PDispatcher, deleg: PDelegate) = + ## Unregisters deleg ``deleg`` from dispatcher ``d``. + for i in 0..len(d.delegates)-1: + if d.delegates[i] == deleg: + d.delegates.del(i) + return + raise newException(EInvalidIndex, "Could not find delegate.") + +proc isWriteable*(s: PAsyncSocket): bool = + ## Determines whether socket ``s`` is ready to be written to. + var writeSock = @[s.socket] + return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock + +proc `userArg=`*(s: PAsyncSocket, val: PObject) = + s.userArg = val + +converter getSocket*(s: PAsyncSocket): TSocket = + return s.socket + +proc isConnected*(s: PAsyncSocket): bool = + ## Determines whether ``s`` is connected. + return s.info == SockConnected +proc isListening*(s: PAsyncSocket): bool = + ## Determines whether ``s`` is listening for incoming connections. + return s.info == SockListening +proc isConnecting*(s: PAsyncSocket): bool = + ## Determines whether ``s`` is connecting. + return s.info == SockConnecting + +proc poll*(d: PDispatcher, timeout: int = 500): bool = + ## This function checks for events on all the sockets in the `PDispatcher`. + ## It then proceeds to call the correct event handler. + ## + ## **Note:** There is no event which signifes when you have been disconnected, + ## it is your job to check whether what you get from ``recv`` is ``""``. + ## If you have been disconnected, `d`'s ``getSocket`` function should report + ## this appropriately. + ## + ## This function returns ``True`` if there are sockets that are still + ## connected (or connecting), otherwise ``False``. Sockets that have been + ## closed are immediately removed from the dispatcher automatically. + ## + ## **Note:** Each delegate has a task associated with it. This gets called + ## after each select() call, if you make timeout ``-1`` the tasks will + ## only be executed after one or more sockets becomes readable or writeable. + + result = true + var readSocks, writeSocks: seq[TSocket] = @[] + + var L = d.delegates.len + var dc = 0 + while dc < L: + template deleg: expr = d.delegates[dc] + let aSock = deleg.getSocket(deleg.deleVal) + if (deleg.mode != MWriteable and aSock.info == SockConnected) or + aSock.info == SockListening: + readSocks.add(aSock.sock) + if aSock.info == SockConnecting or + (aSock.info == SockConnected and deleg.mode != MReadable): + writeSocks.add(aSock.sock) + if aSock.info == SockClosed: + # Socket has been closed remove it from the dispatcher. + d.delegates[dc] = d.delegates[L-1] + + dec L + else: inc dc + d.delegates.setLen(L) + + if readSocks.len() == 0 and writeSocks.len() == 0: + return False + + if select(readSocks, writeSocks, timeout) != 0: + for i in 0..len(d.delegates)-1: + if i > len(d.delegates)-1: break # One delegate might've been removed. + let deleg = d.delegates[i] + let sock = deleg.getSocket(deleg.deleVal) + if sock.info == SockConnected: + if deleg.mode != MWriteable and sock.sock notin readSocks: + if not (sock.info == SockConnecting): + assert(not (sock.info == SockListening)) + deleg.handleRead(deleg.deleVal) + else: + assert(false) + if deleg.mode != MReadable and sock.sock notin writeSocks: + deleg.handleWrite(deleg.deleVal) + + if sock.info == SockListening: + if sock.sock notin readSocks: + # This is a server socket, that had listen() called on it. + # This socket should have a client waiting now. + deleg.handleAccept(deleg.deleVal) + + if sock.info == SockConnecting: + # Checking whether the socket has connected this way should work on + # Windows and Posix. I've checked. + if sock.sock notin writeSocks: + deleg.handleConnect(deleg.deleVal) + + # Execute tasks + for i in items(d.delegates): + i.task(i.deleVal) + +when isMainModule: + type + PIntType = ref TIntType + TIntType = object of TObject + val: int + + PMyArg = ref TMyArg + TMyArg = object of TObject + dispatcher: PDispatcher + val: int + + proc testConnect(s: PAsyncSocket, arg: PObject) = + echo("Connected! " & $PIntType(arg).val) + + proc testRead(s: PAsyncSocket, arg: PObject) = + echo("Reading! " & $PIntType(arg).val) + var data = s.getSocket.recv() + if data == "": + echo("Closing connection. " & $PIntType(arg).val) + s.close() + echo(data) + echo("Finished reading! " & $PIntType(arg).val) + + proc testAccept(s: PAsyncSocket, arg: PObject) = + echo("Accepting client! " & $PMyArg(arg).val) + var (client, address) = s.acceptAddr() + echo("Accepted ", address) + client.handleRead = testRead + var userArg: PIntType + new(userArg) + userArg.val = 78 + client.userArg = userArg + PMyArg(arg).dispatcher.register(client) + + var d = newDispatcher() + + var userArg: PIntType + new(userArg) + userArg.val = 0 + var s = AsyncSocket(userArg = userArg) + s.connect("amber.tenthbit.net", TPort(6667)) + s.handleConnect = testConnect + s.handleRead = testRead + d.register(s) + + var userArg1: PMyArg + new(userArg1) + userArg1.val = 1 + userArg1.dispatcher = d + var server = AsyncSocket(userArg = userArg1) + server.handleAccept = testAccept + server.bindAddr(TPort(5555)) + server.listen() + d.register(server) + + while d.poll(-1): nil + |