summary refs log tree commit diff stats
path: root/lib/pure/asyncio.nim
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2012-01-28 23:24:31 +0100
committerAraq <rumpf_a@web.de>2012-01-28 23:24:31 +0100
commit0f18ab96911cfdae59061dd8afba26c15631d2b5 (patch)
tree814d9dacacf9cf579194f1053af9c1288bf957c3 /lib/pure/asyncio.nim
parent8d19a93f1a2fe33373ea32367f5f1828f7d913cc (diff)
parentd2a8a633f6232de24c08d115f68cbb725fa18755 (diff)
downloadNim-0f18ab96911cfdae59061dd8afba26c15631d2b5.tar.gz
removed conflict in system.nim
Diffstat (limited to 'lib/pure/asyncio.nim')
-rw-r--r--lib/pure/asyncio.nim325
1 files changed, 325 insertions, 0 deletions
diff --git a/lib/pure/asyncio.nim b/lib/pure/asyncio.nim
new file mode 100644
index 000000000..1c366e4d9
--- /dev/null
+++ b/lib/pure/asyncio.nim
@@ -0,0 +1,325 @@
+#
+#
+#            Nimrod's Runtime Library
+#        (c) Copyright 2012 Andreas Rumpf, Dominik Picheta
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
+
+import sockets, os
+
+## This module implements an asynchronous event loop for sockets. 
+## It is akin to Python's asyncore module. Many modules that use sockets
+## have an implementation for this module, those modules should all have a 
+## ``register`` function which you should use to add it to a dispatcher so
+## that you can receive the events associated with that module.
+##
+## Once everything is registered in a dispatcher, you need to call the ``poll``
+## function in a while loop.
+##
+## **Note:** Most modules have tasks which need to be ran regularly, this is
+## why you should not call ``poll`` with a infinite timeout, or even a 
+## very long one. In most cases the default timeout is fine.
+##
+## **Note:** This module currently only supports select(), this is limited by
+## FD_SETSIZE, which is usually 1024. So you may only be able to use 1024
+## sockets at a time.
+## 
+## Most (if not all) modules that use asyncio provide a userArg which is passed
+## on with the events. The type that you set userArg to must be inheriting from
+## TObject!
+
+type
+  TDelegate = object
+    deleVal*: PObject
+
+    handleRead*: proc (h: PObject)
+    handleWrite*: proc (h: PObject)
+    handleConnect*: proc (h: PObject)
+
+    handleAccept*: proc (h: PObject)
+    getSocket*: proc (h: PObject): tuple[info: TInfo, sock: TSocket]
+
+    task*: proc (h: PObject)
+    mode*: TMode
+    
+  PDelegate* = ref TDelegate
+
+  PDispatcher* = ref TDispatcher
+  TDispatcher = object
+    delegates: seq[PDelegate]
+
+  PAsyncSocket* = ref TAsyncSocket
+  TAsyncSocket = object of TObject
+    socket: TSocket
+    info: TInfo
+
+    userArg: PObject
+
+    handleRead*: proc (s: PAsyncSocket, arg: PObject)
+    handleConnect*: proc (s:  PAsyncSocket, arg: PObject)
+
+    handleAccept*: proc (s:  PAsyncSocket, arg: PObject)
+
+  TInfo* = enum
+    SockIdle, SockConnecting, SockConnected, SockListening, SockClosed
+  
+  TMode* = enum
+    MReadable, MWriteable, MReadWrite
+
+proc newDelegate*(): PDelegate =
+  ## Creates a new delegate.
+  new(result)
+  result.handleRead = (proc (h: PObject) = nil)
+  result.handleWrite = (proc (h: PObject) = nil)
+  result.handleConnect = (proc (h: PObject) = nil)
+  result.handleAccept = (proc (h: PObject) = nil)
+  result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
+                        doAssert(false))
+  result.task = (proc (h: PObject) = nil)
+  result.mode = MReadable
+
+proc newAsyncSocket(userArg: PObject = nil): PAsyncSocket =
+  new(result)
+  result.info = SockIdle
+  result.userArg = userArg
+
+  result.handleRead = (proc (s: PAsyncSocket, arg: PObject) = nil)
+  result.handleConnect = (proc (s: PAsyncSocket, arg: PObject) = nil)
+  result.handleAccept = (proc (s: PAsyncSocket, arg: PObject) = nil)
+
+proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, 
+                  protocol: TProtocol = IPPROTO_TCP, 
+                  userArg: PObject = nil): PAsyncSocket =
+  result = newAsyncSocket(userArg)
+  result.socket = socket(domain, typ, protocol)
+  if result.socket == InvalidSocket: OSError()
+  result.socket.setBlocking(false)
+
+proc toDelegate(sock: PAsyncSocket): PDelegate =
+  result = newDelegate()
+  result.deleVal = sock
+  result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
+                        return (PAsyncSocket(h).info, PAsyncSocket(h).socket))
+
+  result.handleConnect = (proc (h: PObject) =
+                            PAsyncSocket(h).info = SockConnected
+                            PAsyncSocket(h).handleConnect(PAsyncSocket(h),
+                               PAsyncSocket(h).userArg))
+  result.handleRead = (proc (h: PObject) =
+                         PAsyncSocket(h).handleRead(PAsyncSocket(h),
+                            PAsyncSocket(h).userArg))
+  result.handleAccept = (proc (h: PObject) =
+                           PAsyncSocket(h).handleAccept(PAsyncSocket(h),
+                              PAsyncSocket(h).userArg))
+
+proc connect*(sock: PAsyncSocket, name: string, port = TPort(0),
+                   af: TDomain = AF_INET) =
+  ## Begins connecting ``sock`` to ``name``:``port``.
+  sock.socket.connectAsync(name, port, af)
+  sock.info = SockConnecting
+
+proc close*(sock: PAsyncSocket) =
+  ## Closes ``sock``. Terminates any current connections.
+  sock.info = SockClosed
+  sock.socket.close()
+
+proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") =
+  ## Equivalent to ``sockets.bindAddr``.
+  sock.socket.bindAddr(port, address)
+
+proc listen*(sock: PAsyncSocket) =
+  ## Equivalent to ``sockets.listen``.
+  sock.socket.listen()
+  sock.info = SockListening
+
+proc acceptAddr*(server: PAsyncSocket): tuple[sock: PAsyncSocket,
+                                              address: string] =
+  ## Equivalent to ``sockets.acceptAddr``.
+  var (client, a) = server.socket.acceptAddr()
+  if client == InvalidSocket: OSError()
+  client.setBlocking(false) # TODO: Needs to be tested.
+  
+  var aSock: PAsyncSocket = newAsyncSocket()
+  aSock.socket = client
+  aSock.info = SockConnected
+  
+  return (aSock, a)
+
+proc accept*(server: PAsyncSocket): PAsyncSocket =
+  ## Equivalent to ``sockets.accept``.
+  var (client, a) = server.acceptAddr()
+  return client
+
+proc newDispatcher*(): PDispatcher =
+  new(result)
+  result.delegates = @[]
+
+proc register*(d: PDispatcher, deleg: PDelegate) =
+  ## Registers delegate ``deleg`` with dispatcher ``d``.
+  d.delegates.add(deleg)
+
+proc register*(d: PDispatcher, sock: PAsyncSocket): PDelegate {.discardable.} =
+  ## Registers async socket ``sock`` with dispatcher ``d``.
+  result = sock.toDelegate()
+  d.register(result)
+
+proc unregister*(d: PDispatcher, deleg: PDelegate) =
+  ## Unregisters deleg ``deleg`` from dispatcher ``d``.
+  for i in 0..len(d.delegates)-1:
+    if d.delegates[i] == deleg:
+      d.delegates.del(i)
+      return
+  raise newException(EInvalidIndex, "Could not find delegate.")
+
+proc isWriteable*(s: PAsyncSocket): bool =
+  ## Determines whether socket ``s`` is ready to be written to.
+  var writeSock = @[s.socket]
+  return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock
+
+proc `userArg=`*(s: PAsyncSocket, val: PObject) =
+  s.userArg = val
+
+converter getSocket*(s: PAsyncSocket): TSocket =
+  return s.socket
+
+proc isConnected*(s: PAsyncSocket): bool =
+  ## Determines whether ``s`` is connected.
+  return s.info == SockConnected
+proc isListening*(s: PAsyncSocket): bool =
+  ## Determines whether ``s`` is listening for incoming connections.  
+  return s.info == SockListening
+proc isConnecting*(s: PAsyncSocket): bool =
+  ## Determines whether ``s`` is connecting.  
+  return s.info == SockConnecting
+
+proc poll*(d: PDispatcher, timeout: int = 500): bool =
+  ## This function checks for events on all the sockets in the `PDispatcher`.
+  ## It then proceeds to call the correct event handler.
+  ## 
+  ## **Note:** There is no event which signifes when you have been disconnected,
+  ## it is your job to check whether what you get from ``recv`` is ``""``.
+  ## If you have been disconnected, `d`'s ``getSocket`` function should report
+  ## this appropriately.
+  ##
+  ## This function returns ``True`` if there are sockets that are still 
+  ## connected (or connecting), otherwise ``False``. Sockets that have been
+  ## closed are immediately removed from the dispatcher automatically.
+  ##
+  ## **Note:** Each delegate has a task associated with it. This gets called
+  ## after each select() call, if you make timeout ``-1`` the tasks will
+  ## only be executed after one or more sockets becomes readable or writeable.
+  
+  result = true
+  var readSocks, writeSocks: seq[TSocket] = @[]
+  
+  var L = d.delegates.len
+  var dc = 0
+  while dc < L:
+    template deleg: expr = d.delegates[dc]
+    let aSock = deleg.getSocket(deleg.deleVal)
+    if (deleg.mode != MWriteable and aSock.info == SockConnected) or
+          aSock.info == SockListening:
+      readSocks.add(aSock.sock)
+    if aSock.info == SockConnecting or
+        (aSock.info == SockConnected and deleg.mode != MReadable):
+      writeSocks.add(aSock.sock)
+    if aSock.info == SockClosed:
+      # Socket has been closed remove it from the dispatcher.
+      d.delegates[dc] = d.delegates[L-1]
+      
+      dec L
+    else: inc dc
+  d.delegates.setLen(L)
+  
+  if readSocks.len() == 0 and writeSocks.len() == 0:
+    return False
+  
+  if select(readSocks, writeSocks, timeout) != 0:
+    for i in 0..len(d.delegates)-1:
+      if i > len(d.delegates)-1: break # One delegate might've been removed.
+      let deleg = d.delegates[i]
+      let sock = deleg.getSocket(deleg.deleVal)
+      if sock.info == SockConnected:
+        if deleg.mode != MWriteable and sock.sock notin readSocks:
+          if not (sock.info == SockConnecting):
+            assert(not (sock.info == SockListening))
+            deleg.handleRead(deleg.deleVal)
+          else:
+            assert(false)
+        if deleg.mode != MReadable and sock.sock notin writeSocks:
+          deleg.handleWrite(deleg.deleVal)
+      
+      if sock.info == SockListening:
+        if sock.sock notin readSocks:
+          # This is a server socket, that had listen() called on it.
+          # This socket should have a client waiting now.
+          deleg.handleAccept(deleg.deleVal)
+      
+      if sock.info == SockConnecting:
+        # Checking whether the socket has connected this way should work on
+        # Windows and Posix. I've checked. 
+        if sock.sock notin writeSocks:
+          deleg.handleConnect(deleg.deleVal)
+  
+  # Execute tasks
+  for i in items(d.delegates):
+    i.task(i.deleVal)
+  
+when isMainModule:
+  type
+    PIntType = ref TIntType
+    TIntType = object of TObject
+      val: int
+
+    PMyArg = ref TMyArg
+    TMyArg = object of TObject
+      dispatcher: PDispatcher
+      val: int
+
+  proc testConnect(s: PAsyncSocket, arg: PObject) =
+    echo("Connected! " & $PIntType(arg).val)
+  
+  proc testRead(s: PAsyncSocket, arg: PObject) =
+    echo("Reading! " & $PIntType(arg).val)
+    var data = s.getSocket.recv()
+    if data == "":
+      echo("Closing connection. " & $PIntType(arg).val)
+      s.close()
+    echo(data)
+    echo("Finished reading! " & $PIntType(arg).val)
+
+  proc testAccept(s: PAsyncSocket, arg: PObject) =
+    echo("Accepting client! " & $PMyArg(arg).val)
+    var (client, address) = s.acceptAddr()
+    echo("Accepted ", address)
+    client.handleRead = testRead
+    var userArg: PIntType
+    new(userArg)
+    userArg.val = 78
+    client.userArg = userArg
+    PMyArg(arg).dispatcher.register(client)
+
+  var d = newDispatcher()
+  
+  var userArg: PIntType
+  new(userArg)
+  userArg.val = 0
+  var s = AsyncSocket(userArg = userArg)
+  s.connect("amber.tenthbit.net", TPort(6667))
+  s.handleConnect = testConnect
+  s.handleRead = testRead
+  d.register(s)
+  
+  var userArg1: PMyArg
+  new(userArg1)
+  userArg1.val = 1
+  userArg1.dispatcher = d
+  var server = AsyncSocket(userArg = userArg1)
+  server.handleAccept = testAccept
+  server.bindAddr(TPort(5555))
+  server.listen()
+  d.register(server)
+  
+  while d.poll(-1): nil
+