summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@googlemail.com>2014-03-22 22:33:02 +0000
committerDominik Picheta <dominikpicheta@googlemail.com>2014-03-22 22:33:53 +0000
commit192e11e7b72470e27bc6bccf1fedbfefc9c4ebd8 (patch)
tree0891ff99456d2af5f1a74198aa3a5b3b9f02191d
parent2ce9f1c77f9aa4504c55c75c57e74a5ad840916d (diff)
downloadNim-192e11e7b72470e27bc6bccf1fedbfefc9c4ebd8.tar.gz
Many renames. Created high level asyncnet module.
-rw-r--r--lib/pure/asyncdispatch.nim (renamed from lib/pure/asyncio2.nim)28
-rw-r--r--lib/pure/asyncnet.nim147
-rw-r--r--lib/pure/net.nim4
-rw-r--r--lib/pure/rawsockets.nim (renamed from lib/pure/sockets2.nim)8
4 files changed, 175 insertions, 12 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncdispatch.nim
index fbb02c37c..67361e46c 100644
--- a/lib/pure/asyncio2.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -9,14 +9,14 @@
 
 import os, oids, tables, strutils, macros
 
-import sockets2
+import rawsockets
 
-## Asyncio2 
+## AsyncDispatch
 ## --------
 ##
-## This module implements a brand new asyncio module based on Futures.
-## IOCP is used under the hood on Windows and the selectors module is used for
-## other operating systems.
+## This module implements a brand new dispatcher based on Futures.
+## On Windows IOCP is used and on other operating systems the selectors module
+## is used instead.
 
 # -- Futures
 
@@ -27,7 +27,7 @@ type
 
   PFuture*[T] = ref object of PFutureBase
     value: T
-    error: ref EBase
+    error*: ref EBase # TODO: This shouldn't be necessary, generics bug?
 
 proc newFuture*[T](): PFuture[T] =
   ## Creates a new future.
@@ -90,6 +90,11 @@ proc read*[T](future: PFuture[T]): T =
     # TODO: Make a custom exception type for this?
     raise newException(EInvalidValue, "Future still in progress.")
 
+proc readError*[T](future: PFuture[T]): ref EBase =
+  if future.error != nil: return future.error
+  else:
+    raise newException(EInvalidValue, "No error in future.")
+
 proc finished*[T](future: PFuture[T]): bool =
   ## Determines whether ``future`` has completed.
   ##
@@ -478,6 +483,7 @@ when defined(windows) or defined(nimdoc):
                protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
     ## Creates a new socket and registers it with the dispatcher implicitly.
     result = socket(domain, typ, protocol)
+    result.setBlocking(false)
     disp.register(result)
 
   proc close*(disp: PDispatcher, socket: TSocketHandle) =
@@ -516,6 +522,7 @@ else:
                typ: TType = SOCK_STREAM,
                protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
     result = socket(domain, typ, protocol)
+    result.setBlocking(false)
     disp.register(result)
   
   proc close*(disp: PDispatcher, sock: TSocketHandle) =
@@ -919,6 +926,15 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.}
       return
     add(result.string, c)
 
+var gDisp*{.threadvar.}: PDispatcher ## Global dispatcher
+gDisp = newDispatcher()
+
+proc runForever*() =
+  ## Begins a never ending global dispatcher poll loop.
+  while true:
+    gDisp.poll()
+
+
 when isMainModule:
   
   var p = newDispatcher()
diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim
new file mode 100644
index 000000000..ed8a2fb82
--- /dev/null
+++ b/lib/pure/asyncnet.nim
@@ -0,0 +1,147 @@
+import asyncdispatch
+import rawsockets
+import net
+
+when defined(ssl):
+  import openssl
+
+type
+  TAsyncSocket = object ## socket type
+    fd: TSocketHandle
+    case isBuffered: bool # determines whether this socket is buffered.
+    of true:
+      buffer: array[0..BufferSize, char]
+      currPos: int # current index in buffer
+      bufLen: int # current length of buffer
+    of false: nil
+    when defined(ssl):
+      case isSsl: bool
+      of true:
+        sslHandle: PSSL
+        sslContext: PSSLContext
+        sslNoHandshake: bool # True if needs handshake.
+        sslHasPeekChar: bool
+        sslPeekChar: char
+      of false: nil
+
+  PAsyncSocket* = ref TAsyncSocket
+
+# TODO: Save AF, domain etc info and reuse it in procs which need it like connect.
+
+proc newSocket(fd: TSocketHandle, isBuff: bool): PAsyncSocket =
+  assert fd != osInvalidSocket
+  new(result)
+  result.fd = fd
+  result.isBuffered = isBuff
+  if isBuff:
+    result.currPos = 0
+
+proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
+    protocol: TProtocol = IPPROTO_TCP, buffered = true): PAsyncSocket =
+  ## Creates a new asynchronous socket.
+  result = newSocket(gDisp.socket(domain, typ, protocol), buffered)
+
+proc connect*(socket: PAsyncSocket, address: string, port: TPort,
+    af = AF_INET): PFuture[void] =
+  ## Connects ``socket`` to server at ``address:port``.
+  ##
+  ## Returns a ``PFuture`` which will complete when the connection succeeds
+  ## or an error occurs.
+  result = gDisp.connect(socket.fd, address, port, af)
+
+proc recv*(socket: PAsyncSocket, size: int,
+           flags: int = 0): PFuture[string] =
+  ## Reads ``size`` bytes from ``socket``. Returned future will complete once
+  ## all of the requested data is read. If socket is disconnected during the
+  ## recv operation then the future may complete with only a part of the
+  ## requested data read. If socket is disconnected and no data is available
+  ## to be read then the future will complete with a value of ``""``.
+  result = gDisp.recv(socket.fd, size, flags)
+
+proc send*(socket: PAsyncSocket, data: string): PFuture[void] =
+  ## Sends ``data`` to ``socket``. The returned future will complete once all
+  ## data has been sent.
+  result = gDisp.send(socket.fd, data)
+
+proc acceptAddr*(socket: PAsyncSocket): 
+      PFuture[tuple[address: string, client: PAsyncSocket]] =
+  ## Accepts a new connection. Returns a future containing the client socket
+  ## corresponding to that connection and the remote address of the client.
+  ## The future will complete when the connection is successfully accepted.
+  var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]()
+  var fut = gDisp.acceptAddr(socket.fd)
+  fut.callback =
+    proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) =
+      assert future.finished
+      if future.failed:
+        retFuture.fail(future.readError)
+      else:
+        let resultTup = (future.read.address,
+                         newSocket(future.read.client, socket.isBuffered))
+        retFuture.complete(resultTup)
+  return retFuture
+
+proc accept*(socket: PAsyncSocket): PFuture[PAsyncSocket] =
+  ## Accepts a new connection. Returns a future containing the client socket
+  ## corresponding to that connection.
+  ## The future will complete when the connection is successfully accepted.
+  var retFut = newFuture[PAsyncSocket]()
+  var fut = acceptAddr(socket)
+  fut.callback =
+    proc (future: PFuture[tuple[address: string, client: PAsyncSocket]]) =
+      assert future.finished
+      if future.failed:
+        retFut.fail(future.readError)
+      else:
+        retFut.complete(future.read.client)
+  return retFut
+
+proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} =
+  ## Reads a line of data from ``socket``. Returned future will complete once
+  ## a full line is read or an error occurs.
+  ##
+  ## If a full line is read ``\r\L`` is not
+  ## added to ``line``, however if solely ``\r\L`` is read then ``line``
+  ## will be set to it.
+  ## 
+  ## If the socket is disconnected, ``line`` will be set to ``""``.
+  ##
+  ## If the socket is disconnected in the middle of a line (before ``\r\L``
+  ## is read) then line will be set to ``""``.
+  ## The partial line **will be lost**.
+  
+  template addNLIfEmpty(): stmt =
+    if result.len == 0:
+      result.add("\c\L")
+
+  result = ""
+  var c = ""
+  while true:
+    c = await recv(socket, 1)
+    if c.len == 0:
+      return ""
+    if c == "\r":
+      c = await recv(socket, 1, MSG_PEEK)
+      if c.len > 0 and c == "\L":
+        discard await recv(socket, 1)
+      addNLIfEmpty()
+      return
+    elif c == "\L":
+      addNLIfEmpty()
+      return
+    add(result.string, c)
+
+when isMainModule:
+  proc main() {.async.} =
+    var sock = AsyncSocket()
+    await sock.connect("irc.freenode.net", TPort(6667))
+    while true:
+      let line = await sock.recvLine()
+      if line == "":
+        echo("Disconnected")
+        break
+      else:
+        echo("Got line: ", line)
+  main()
+  runForever()
+    
diff --git a/lib/pure/net.nim b/lib/pure/net.nim
index 45883166b..d40f0949b 100644
--- a/lib/pure/net.nim
+++ b/lib/pure/net.nim
@@ -10,7 +10,7 @@
 ## This module implements a high-level cross-platform sockets interface.
 
 {.deadCodeElim: on.}
-import sockets2, os, strutils, unsigned, parseutils, times
+import rawsockets, os, strutils, unsigned, parseutils, times
 
 type
   IpAddressFamily* {.pure.} = enum ## Describes the type of an IP address
@@ -360,7 +360,7 @@ proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
   ## Creates a new socket.
   ##
   ## If an error occurs EOS will be raised.
-  let fd = sockets2.socket(domain, typ, protocol)
+  let fd = rawsockets.socket(domain, typ, protocol)
   if fd == osInvalidSocket:
     osError(osLastError())
   result = newSocket(fd, buffered)
diff --git a/lib/pure/sockets2.nim b/lib/pure/rawsockets.nim
index 975cc685a..db04f6097 100644
--- a/lib/pure/sockets2.nim
+++ b/lib/pure/rawsockets.nim
@@ -209,13 +209,13 @@ proc htonl*(x: int32): int32 =
   ## Converts 32-bit integers from host to network byte order. On machines
   ## where the host byte order is the same as network byte order, this is
   ## a no-op; otherwise, it performs a 4-byte swap operation.
-  result = sockets2.ntohl(x)
+  result = rawsockets.ntohl(x)
 
 proc htons*(x: int16): int16 =
   ## Converts 16-bit positive integers from host to network byte order.
   ## On machines where the host byte order is the same as network byte
   ## order, this is a no-op; otherwise, it performs a 2-byte swap operation.
-  result = sockets2.ntohs(x)
+  result = rawsockets.ntohs(x)
 
 proc getServByName*(name, proto: string): TServent {.tags: [FReadIO].} =
   ## Searches the database from the beginning and finds the first entry for 
@@ -256,7 +256,7 @@ proc getHostByAddr*(ip: string): Thostent {.tags: [FReadIO].} =
   
   when defined(windows):
     var s = winlean.gethostbyaddr(addr(myaddr), sizeof(myaddr).cuint,
-                                  cint(sockets2.AF_INET))
+                                  cint(rawsockets.AF_INET))
     if s == nil: osError(osLastError())
   else:
     var s = posix.gethostbyaddr(addr(myaddr), sizeof(myaddr).TSocklen, 
@@ -312,7 +312,7 @@ proc getSockName*(socket: TSocketHandle): TPort =
   if getsockname(socket, cast[ptr TSockAddr](addr(name)),
                  addr(namelen)) == -1'i32:
     osError(osLastError())
-  result = TPort(sockets2.ntohs(name.sin_port))
+  result = TPort(rawsockets.ntohs(name.sin_port))
 
 proc getSockOptInt*(socket: TSocketHandle, level, optname: int): int {.
   tags: [FReadIO].} =