diff options
author | Araq <rumpf_a@web.de> | 2012-01-28 23:24:31 +0100 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2012-01-28 23:24:31 +0100 |
commit | 0f18ab96911cfdae59061dd8afba26c15631d2b5 (patch) | |
tree | 814d9dacacf9cf579194f1053af9c1288bf957c3 /lib/pure | |
parent | 8d19a93f1a2fe33373ea32367f5f1828f7d913cc (diff) | |
parent | d2a8a633f6232de24c08d115f68cbb725fa18755 (diff) | |
download | Nim-0f18ab96911cfdae59061dd8afba26c15631d2b5.tar.gz |
removed conflict in system.nim
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/asyncio.nim | 325 | ||||
-rw-r--r-- | lib/pure/ftpclient.nim | 365 | ||||
-rw-r--r-- | lib/pure/irc.nim | 188 | ||||
-rwxr-xr-x | lib/pure/os.nim | 10 | ||||
-rwxr-xr-x | lib/pure/osproc.nim | 7 | ||||
-rwxr-xr-x | lib/pure/scgi.nim | 54 | ||||
-rwxr-xr-x | lib/pure/sockets.nim | 91 |
7 files changed, 825 insertions, 215 deletions
diff --git a/lib/pure/asyncio.nim b/lib/pure/asyncio.nim new file mode 100644 index 000000000..1c366e4d9 --- /dev/null +++ b/lib/pure/asyncio.nim @@ -0,0 +1,325 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2012 Andreas Rumpf, Dominik Picheta +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +import sockets, os + +## This module implements an asynchronous event loop for sockets. +## It is akin to Python's asyncore module. Many modules that use sockets +## have an implementation for this module, those modules should all have a +## ``register`` function which you should use to add it to a dispatcher so +## that you can receive the events associated with that module. +## +## Once everything is registered in a dispatcher, you need to call the ``poll`` +## function in a while loop. +## +## **Note:** Most modules have tasks which need to be ran regularly, this is +## why you should not call ``poll`` with a infinite timeout, or even a +## very long one. In most cases the default timeout is fine. +## +## **Note:** This module currently only supports select(), this is limited by +## FD_SETSIZE, which is usually 1024. So you may only be able to use 1024 +## sockets at a time. +## +## Most (if not all) modules that use asyncio provide a userArg which is passed +## on with the events. The type that you set userArg to must be inheriting from +## TObject! + +type + TDelegate = object + deleVal*: PObject + + handleRead*: proc (h: PObject) + handleWrite*: proc (h: PObject) + handleConnect*: proc (h: PObject) + + handleAccept*: proc (h: PObject) + getSocket*: proc (h: PObject): tuple[info: TInfo, sock: TSocket] + + task*: proc (h: PObject) + mode*: TMode + + PDelegate* = ref TDelegate + + PDispatcher* = ref TDispatcher + TDispatcher = object + delegates: seq[PDelegate] + + PAsyncSocket* = ref TAsyncSocket + TAsyncSocket = object of TObject + socket: TSocket + info: TInfo + + userArg: PObject + + handleRead*: proc (s: PAsyncSocket, arg: PObject) + handleConnect*: proc (s: PAsyncSocket, arg: PObject) + + handleAccept*: proc (s: PAsyncSocket, arg: PObject) + + TInfo* = enum + SockIdle, SockConnecting, SockConnected, SockListening, SockClosed + + TMode* = enum + MReadable, MWriteable, MReadWrite + +proc newDelegate*(): PDelegate = + ## Creates a new delegate. + new(result) + result.handleRead = (proc (h: PObject) = nil) + result.handleWrite = (proc (h: PObject) = nil) + result.handleConnect = (proc (h: PObject) = nil) + result.handleAccept = (proc (h: PObject) = nil) + result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] = + doAssert(false)) + result.task = (proc (h: PObject) = nil) + result.mode = MReadable + +proc newAsyncSocket(userArg: PObject = nil): PAsyncSocket = + new(result) + result.info = SockIdle + result.userArg = userArg + + result.handleRead = (proc (s: PAsyncSocket, arg: PObject) = nil) + result.handleConnect = (proc (s: PAsyncSocket, arg: PObject) = nil) + result.handleAccept = (proc (s: PAsyncSocket, arg: PObject) = nil) + +proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP, + userArg: PObject = nil): PAsyncSocket = + result = newAsyncSocket(userArg) + result.socket = socket(domain, typ, protocol) + if result.socket == InvalidSocket: OSError() + result.socket.setBlocking(false) + +proc toDelegate(sock: PAsyncSocket): PDelegate = + result = newDelegate() + result.deleVal = sock + result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] = + return (PAsyncSocket(h).info, PAsyncSocket(h).socket)) + + result.handleConnect = (proc (h: PObject) = + PAsyncSocket(h).info = SockConnected + PAsyncSocket(h).handleConnect(PAsyncSocket(h), + PAsyncSocket(h).userArg)) + result.handleRead = (proc (h: PObject) = + PAsyncSocket(h).handleRead(PAsyncSocket(h), + PAsyncSocket(h).userArg)) + result.handleAccept = (proc (h: PObject) = + PAsyncSocket(h).handleAccept(PAsyncSocket(h), + PAsyncSocket(h).userArg)) + +proc connect*(sock: PAsyncSocket, name: string, port = TPort(0), + af: TDomain = AF_INET) = + ## Begins connecting ``sock`` to ``name``:``port``. + sock.socket.connectAsync(name, port, af) + sock.info = SockConnecting + +proc close*(sock: PAsyncSocket) = + ## Closes ``sock``. Terminates any current connections. + sock.info = SockClosed + sock.socket.close() + +proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") = + ## Equivalent to ``sockets.bindAddr``. + sock.socket.bindAddr(port, address) + +proc listen*(sock: PAsyncSocket) = + ## Equivalent to ``sockets.listen``. + sock.socket.listen() + sock.info = SockListening + +proc acceptAddr*(server: PAsyncSocket): tuple[sock: PAsyncSocket, + address: string] = + ## Equivalent to ``sockets.acceptAddr``. + var (client, a) = server.socket.acceptAddr() + if client == InvalidSocket: OSError() + client.setBlocking(false) # TODO: Needs to be tested. + + var aSock: PAsyncSocket = newAsyncSocket() + aSock.socket = client + aSock.info = SockConnected + + return (aSock, a) + +proc accept*(server: PAsyncSocket): PAsyncSocket = + ## Equivalent to ``sockets.accept``. + var (client, a) = server.acceptAddr() + return client + +proc newDispatcher*(): PDispatcher = + new(result) + result.delegates = @[] + +proc register*(d: PDispatcher, deleg: PDelegate) = + ## Registers delegate ``deleg`` with dispatcher ``d``. + d.delegates.add(deleg) + +proc register*(d: PDispatcher, sock: PAsyncSocket): PDelegate {.discardable.} = + ## Registers async socket ``sock`` with dispatcher ``d``. + result = sock.toDelegate() + d.register(result) + +proc unregister*(d: PDispatcher, deleg: PDelegate) = + ## Unregisters deleg ``deleg`` from dispatcher ``d``. + for i in 0..len(d.delegates)-1: + if d.delegates[i] == deleg: + d.delegates.del(i) + return + raise newException(EInvalidIndex, "Could not find delegate.") + +proc isWriteable*(s: PAsyncSocket): bool = + ## Determines whether socket ``s`` is ready to be written to. + var writeSock = @[s.socket] + return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock + +proc `userArg=`*(s: PAsyncSocket, val: PObject) = + s.userArg = val + +converter getSocket*(s: PAsyncSocket): TSocket = + return s.socket + +proc isConnected*(s: PAsyncSocket): bool = + ## Determines whether ``s`` is connected. + return s.info == SockConnected +proc isListening*(s: PAsyncSocket): bool = + ## Determines whether ``s`` is listening for incoming connections. + return s.info == SockListening +proc isConnecting*(s: PAsyncSocket): bool = + ## Determines whether ``s`` is connecting. + return s.info == SockConnecting + +proc poll*(d: PDispatcher, timeout: int = 500): bool = + ## This function checks for events on all the sockets in the `PDispatcher`. + ## It then proceeds to call the correct event handler. + ## + ## **Note:** There is no event which signifes when you have been disconnected, + ## it is your job to check whether what you get from ``recv`` is ``""``. + ## If you have been disconnected, `d`'s ``getSocket`` function should report + ## this appropriately. + ## + ## This function returns ``True`` if there are sockets that are still + ## connected (or connecting), otherwise ``False``. Sockets that have been + ## closed are immediately removed from the dispatcher automatically. + ## + ## **Note:** Each delegate has a task associated with it. This gets called + ## after each select() call, if you make timeout ``-1`` the tasks will + ## only be executed after one or more sockets becomes readable or writeable. + + result = true + var readSocks, writeSocks: seq[TSocket] = @[] + + var L = d.delegates.len + var dc = 0 + while dc < L: + template deleg: expr = d.delegates[dc] + let aSock = deleg.getSocket(deleg.deleVal) + if (deleg.mode != MWriteable and aSock.info == SockConnected) or + aSock.info == SockListening: + readSocks.add(aSock.sock) + if aSock.info == SockConnecting or + (aSock.info == SockConnected and deleg.mode != MReadable): + writeSocks.add(aSock.sock) + if aSock.info == SockClosed: + # Socket has been closed remove it from the dispatcher. + d.delegates[dc] = d.delegates[L-1] + + dec L + else: inc dc + d.delegates.setLen(L) + + if readSocks.len() == 0 and writeSocks.len() == 0: + return False + + if select(readSocks, writeSocks, timeout) != 0: + for i in 0..len(d.delegates)-1: + if i > len(d.delegates)-1: break # One delegate might've been removed. + let deleg = d.delegates[i] + let sock = deleg.getSocket(deleg.deleVal) + if sock.info == SockConnected: + if deleg.mode != MWriteable and sock.sock notin readSocks: + if not (sock.info == SockConnecting): + assert(not (sock.info == SockListening)) + deleg.handleRead(deleg.deleVal) + else: + assert(false) + if deleg.mode != MReadable and sock.sock notin writeSocks: + deleg.handleWrite(deleg.deleVal) + + if sock.info == SockListening: + if sock.sock notin readSocks: + # This is a server socket, that had listen() called on it. + # This socket should have a client waiting now. + deleg.handleAccept(deleg.deleVal) + + if sock.info == SockConnecting: + # Checking whether the socket has connected this way should work on + # Windows and Posix. I've checked. + if sock.sock notin writeSocks: + deleg.handleConnect(deleg.deleVal) + + # Execute tasks + for i in items(d.delegates): + i.task(i.deleVal) + +when isMainModule: + type + PIntType = ref TIntType + TIntType = object of TObject + val: int + + PMyArg = ref TMyArg + TMyArg = object of TObject + dispatcher: PDispatcher + val: int + + proc testConnect(s: PAsyncSocket, arg: PObject) = + echo("Connected! " & $PIntType(arg).val) + + proc testRead(s: PAsyncSocket, arg: PObject) = + echo("Reading! " & $PIntType(arg).val) + var data = s.getSocket.recv() + if data == "": + echo("Closing connection. " & $PIntType(arg).val) + s.close() + echo(data) + echo("Finished reading! " & $PIntType(arg).val) + + proc testAccept(s: PAsyncSocket, arg: PObject) = + echo("Accepting client! " & $PMyArg(arg).val) + var (client, address) = s.acceptAddr() + echo("Accepted ", address) + client.handleRead = testRead + var userArg: PIntType + new(userArg) + userArg.val = 78 + client.userArg = userArg + PMyArg(arg).dispatcher.register(client) + + var d = newDispatcher() + + var userArg: PIntType + new(userArg) + userArg.val = 0 + var s = AsyncSocket(userArg = userArg) + s.connect("amber.tenthbit.net", TPort(6667)) + s.handleConnect = testConnect + s.handleRead = testRead + d.register(s) + + var userArg1: PMyArg + new(userArg1) + userArg1.val = 1 + userArg1.dispatcher = d + var server = AsyncSocket(userArg = userArg1) + server.handleAccept = testAccept + server.bindAddr(TPort(5555)) + server.listen() + d.register(server) + + while d.poll(-1): nil + diff --git a/lib/pure/ftpclient.nim b/lib/pure/ftpclient.nim index 776df7c42..6d207b98f 100644 --- a/lib/pure/ftpclient.nim +++ b/lib/pure/ftpclient.nim @@ -6,34 +6,26 @@ # distribution, for details about the copyright. # -import sockets, strutils, parseutils, times, os +import sockets, strutils, parseutils, times, os, asyncio ## This module **partially** implements an FTP client as specified ## by `RFC 959 <http://tools.ietf.org/html/rfc959>`_. -## Functions which require file transfers have an ``async`` parameter, when -## this parameter is set to ``true``, it is your job to call the ``poll`` -## function periodically to progress the transfer. +## +## This module provides both a synchronous and asynchronous implementation. +## The asynchronous implementation requires you to use the ``AsyncFTPClient`` +## function. You are then required to register the ``PAsyncFTPClient`` with a +## asyncio dispatcher using the ``register`` function. Take a look at the +## asyncio module documentation for more information. ## ## Here is some example usage of this module: ## ## .. code-block:: Nimrod ## var ftp = FTPClient("example.org", user = "user", pass = "pass") ## ftp.connect() -## ftp.retrFile("file.ext", "file.ext", async = true) -## while True: -## var event: TFTPEvent -## if ftp.poll(event): -## case event.typ -## of EvRetr: -## echo("Download finished!") -## break -## of EvTransferProgress: -## echo(event.speed div 1000, " kb/s") -## else: assert(false) - +## ftp.retrFile("file.ext", "file.ext") type - TFTPClient* = object + TFTPClient* = object of TObject csock: TSocket # Command connection socket dsock: TSocket # Data connection socket user, pass: string @@ -43,24 +35,35 @@ type jobInProgress: bool job: ref TFTPJob + isAsync: bool + + dsockStatus: TInfo + FTPJobType = enum - JListCmd, JRetrText, JRetr, JStore + JRetrText, JRetr, JStore TFTPJob = object - prc: proc (ftp: var TFTPClient, timeout: int): bool + prc: proc (ftp: var TFTPClient, async: bool): bool case typ*: FTPJobType - of JListCmd, JRetrText: + of JRetrText: lines: string of JRetr, JStore: - dsockClosed: bool file: TFile filename: string total: biggestInt # In bytes. progress: biggestInt # In bytes. oneSecond: biggestInt # Bytes transferred in one second. lastProgressReport: float # Time + toStore: string # Data left to upload (Only used with async) else: nil + PAsyncFTPClient* = ref TAsyncFTPClient ## Async alternative to TFTPClient. + TAsyncFTPClient* = object of TFTPClient + handleEvent*: proc (ftp: var TAsyncFTPClient, ev: TFTPEvent, + userArg: PObject) + dele: PDelegate + userArg: PObject + FTPEventType* = enum EvTransferProgress, EvLines, EvRetr, EvStore @@ -86,6 +89,9 @@ proc FTPClient*(address: string, port = TPort(21), result.address = address result.port = port + result.isAsync = false + result.dsockStatus = SockIdle + proc expectReply(ftp: var TFTPClient): TaintedString = result = TaintedString"" if not ftp.csock.recvLine(result): setLen(result.string, 0) @@ -110,7 +116,7 @@ proc assertReply(received: TaintedString, expected: openarray[string]) = [expected.join("' or '"), received.string]) proc createJob(ftp: var TFTPClient, - prc: proc (ftp: var TFTPClient, timeout: int): bool, + prc: proc (ftp: var TFTPClient, async: bool): bool, cmd: FTPJobType) = if ftp.jobInProgress: raise newException(EFTP, "Unable to do two jobs at once.") @@ -119,22 +125,24 @@ proc createJob(ftp: var TFTPClient, ftp.job.prc = prc ftp.job.typ = cmd case cmd - of JListCmd, JRetrText: + of JRetrText: ftp.job.lines = "" of JRetr, JStore: - ftp.job.dsockClosed = false + ftp.job.toStore = "" proc deleteJob(ftp: var TFTPClient) = assert ftp.jobInProgress ftp.jobInProgress = false case ftp.job.typ - of JListCmd, JRetrText: + of JRetrText: ftp.job.lines = "" of JRetr, JStore: ftp.job.file.close() proc pasv(ftp: var TFTPClient) = ## Negotiate a data connection. + ftp.dsock = socket() + if ftp.isAsync: ftp.dsock.setBlocking(false) var pasvMsg = ftp.send("PASV").string.strip.TaintedString assertReply(pasvMsg, "227") var betweenParens = captureBetween(pasvMsg.string, '(', ')') @@ -142,8 +150,17 @@ proc pasv(ftp: var TFTPClient) = var ip = nums[0.. -3] var port = nums[-2.. -1] var properPort = port[0].parseInt()*256+port[1].parseInt() - ftp.dsock = socket() - ftp.dsock.connect(ip.join("."), TPort(properPort.toU16)) + if ftp.isAsync: + # connectAsync should work well even if socket is blocking. But we need + # isAsync anyway... :\ + ftp.dsock.connectAsync(ip.join("."), TPort(properPort.toU16)) + ftp.dsockStatus = SockConnecting + else: + ftp.dsock.connect(ip.join("."), TPort(properPort.toU16)) + ftp.dsockStatus = SockConnected + +proc normalizePathSep(path: string): string = + return replace(path, '\\', '/') proc connect*(ftp: var TFTPClient) = ## Connect to the FTP server specified by ``ftp``. @@ -167,22 +184,28 @@ proc pwd*(ftp: var TFTPClient): string = proc cd*(ftp: var TFTPClient, dir: string) = ## Changes the current directory on the remote FTP server to ``dir``. - assertReply ftp.send("CWD " & dir), "250" + assertReply ftp.send("CWD " & dir.normalizePathSep), "250" proc cdup*(ftp: var TFTPClient) = ## Changes the current directory to the parent of the current directory. assertReply ftp.send("CDUP"), "200" -proc asyncLines(ftp: var TFTPClient, timeout: int): bool = - ## Downloads text data in ASCII mode, Asynchronously. +proc getLines(ftp: var TFTPClient, async: bool = false): bool = + ## Downloads text data in ASCII mode ## Returns true if the download is complete. - var readSocks: seq[TSocket] = @[ftp.dsock, ftp.csock] - if readSocks.select(timeout) != 0: - if ftp.dsock notin readSocks: - var r = TaintedString"" - if ftp.dsock.recvLine(r): + ## It doesn't if `async` is true, because it doesn't check for 226 then. + if ftp.dsockStatus == SockConnected: + var r = TaintedString"" + if ftp.dsock.recvLine(r): + if r.string != "": ftp.job.lines.add(r.string & "\n") - if ftp.csock notin readSocks: + else: + ftp.dsockStatus = SockClosed + + if not async: + var readSocks: seq[TSocket] = @[ftp.csock] + # This is only needed here. Asyncio gets this socket... + if readSocks.select(1) != 0 and ftp.csock notin readSocks: assertReply ftp.expectReply(), "226" return true @@ -191,15 +214,15 @@ proc listDirs*(ftp: var TFTPClient, dir: string = "", ## Returns a list of filenames in the given directory. If ``dir`` is "", ## the current directory is used. If ``async`` is true, this ## function will return immediately and it will be your job to - ## call ``poll`` to progress this operation. + ## use asyncio's ``poll`` to progress this operation. - ftp.createJob(asyncLines, JRetrText) + ftp.createJob(getLines, JRetrText) ftp.pasv() - assertReply ftp.send("NLST " & dir), ["125", "150"] + assertReply ftp.send("NLST " & dir.normalizePathSep), ["125", "150"] if not async: - while not ftp.job.prc(ftp, 500): nil + while not ftp.job.prc(ftp, false): nil result = splitLines(ftp.job.lines) ftp.deleteJob() else: return @[] @@ -211,7 +234,7 @@ proc fileExists*(ftp: var TFTPClient, file: string): bool = ## files, because a full list of file names must be retrieved. var files = ftp.listDirs() for f in items(files): - if f == file: return true + if f.normalizePathSep == file.normalizePathSep: return true proc createDir*(ftp: var TFTPClient, dir: string, recursive: bool = false) = ## Creates a directory ``dir``. If ``recursive`` is true, the topmost @@ -219,7 +242,7 @@ proc createDir*(ftp: var TFTPClient, dir: string, recursive: bool = false) = ## etc. this allows you to give a full path as the ``dir`` without worrying ## about subdirectories not existing. if not recursive: - assertReply ftp.send("MKD " & dir), "257" + assertReply ftp.send("MKD " & dir.normalizePathSep), "257" else: var reply = TaintedString"" var previousDirs = "" @@ -249,20 +272,21 @@ proc chmod*(ftp: var TFTPClient, path: string, of fpOthersRead: otherOctal.inc(4) var perm = $userOctal & $groupOctal & $otherOctal - assertReply ftp.send("SITE CHMOD " & perm & " " & path), "200" + assertReply ftp.send("SITE CHMOD " & perm & + " " & path.normalizePathSep), "200" proc list*(ftp: var TFTPClient, dir: string = "", async = false): string = ## Lists all files in ``dir``. If ``dir`` is ``""``, uses the current ## working directory. If ``async`` is true, this function will return - ## immediately and it will be your job to call ``poll`` to progress this - ## operation. - ftp.createJob(asyncLines, JRetrText) + ## immediately and it will be your job to call asyncio's + ## ``poll`` to progress this operation. + ftp.createJob(getLines, JRetrText) ftp.pasv() - assertReply(ftp.send("LIST" & " " & dir), ["125", "150"]) + assertReply(ftp.send("LIST" & " " & dir.normalizePathSep), ["125", "150"]) if not async: - while not ftp.job.prc(ftp, 500): nil + while not ftp.job.prc(ftp, false): nil result = ftp.job.lines ftp.deleteJob() else: @@ -272,28 +296,36 @@ proc retrText*(ftp: var TFTPClient, file: string, async = false): string = ## Retrieves ``file``. File must be ASCII text. ## If ``async`` is true, this function will return immediately and ## it will be your job to call ``poll`` to progress this operation. - ftp.createJob(asyncLines, JRetrText) + ftp.createJob(getLines, JRetrText) ftp.pasv() - assertReply ftp.send("RETR " & file), ["125", "150"] + assertReply ftp.send("RETR " & file.normalizePathSep), ["125", "150"] if not async: - while not ftp.job.prc(ftp, 500): nil + while not ftp.job.prc(ftp, false): nil result = ftp.job.lines ftp.deleteJob() else: return "" -proc asyncFile(ftp: var TFTPClient, timeout: int): bool = - var readSocks: seq[TSocket] = @[ftp.dsock, ftp.csock] - if readSocks.select(timeout) != 0: - if ftp.dsock notin readSocks: - var r = ftp.dsock.recv().string - if r != "": - ftp.job.progress.inc(r.len) - ftp.job.oneSecond.inc(r.len) - ftp.job.file.write(r) - - if ftp.csock notin readSocks: +proc getFile(ftp: var TFTPClient, async = false): bool = + if ftp.dsockStatus == SockConnected: + var r = "".TaintedString + var returned = false + if async: returned = ftp.dsock.recvAsync(r) + else: + r = ftp.dsock.recv() + returned = true + let r2 = r.string + if r2 != "": + ftp.job.progress.inc(r2.len) + ftp.job.oneSecond.inc(r2.len) + ftp.job.file.write(r2) + elif returned and r2 == "": + ftp.dsockStatus = SockClosed + + if not async: + var readSocks: seq[TSocket] = @[ftp.csock] + if readSocks.select(1) != 0 and ftp.csock notin readSocks: assertReply ftp.expectReply(), "226" return true @@ -302,10 +334,10 @@ proc retrFile*(ftp: var TFTPClient, file, dest: string, async = false) = ## asynchronously is recommended to view the progress of the download. ## The ``EvRetr`` event is given by ``poll`` when the download is finished, ## and the ``filename`` field will be equal to ``file``. - ftp.createJob(asyncFile, JRetr) + ftp.createJob(getFile, JRetr) ftp.job.file = open(dest, mode = fmWrite) ftp.pasv() - var reply = ftp.send("RETR " & file) + var reply = ftp.send("RETR " & file.normalizePathSep) assertReply reply, ["125", "150"] if {'(', ')'} notin reply.string: raise newException(EInvalidReply, "Reply has no file size.") @@ -315,37 +347,43 @@ proc retrFile*(ftp: var TFTPClient, file, dest: string, async = false) = ftp.job.total = fileSize ftp.job.lastProgressReport = epochTime() - ftp.job.filename = file + ftp.job.filename = file.normalizePathSep if not async: - while not ftp.job.prc(ftp, 500): nil + while not ftp.job.prc(ftp, false): nil ftp.deleteJob() -proc asyncUpload(ftp: var TFTPClient, timeout: int): bool = - var writeSocks: seq[TSocket] = @[ftp.dsock] - var readSocks: seq[TSocket] = @[ftp.csock] - - if select(readSocks, writeSocks, timeout) != 0: - if ftp.dsock notin writeSocks and not ftp.job.dsockClosed: - var buffer: array[0..1023, byte] - var len = ftp.job.file.readBytes(buffer, 0, 1024) +proc doUpload(ftp: var TFTPClient, async = false): bool = + if ftp.dsockStatus == SockConnected: + if ftp.job.toStore.len() > 0: + assert(async) + if ftp.dsock.sendAsync(ftp.job.toStore): + ftp.job.toStore = "" + ftp.job.progress.inc(ftp.job.toStore.len) + ftp.job.oneSecond.inc(ftp.job.toStore.len) + + else: + var s = newStringOfCap(4000) + var len = ftp.job.file.readBuffer(addr(s[0]), 4000) + setLen(s, len) if len == 0: # File finished uploading. ftp.dsock.close() - ftp.job.dsockClosed = true - return - - if ftp.dsock.send(addr(buffer), len) != len: - raise newException(EIO, "could not 'send' all data.") + ftp.dsockStatus = SockClosed + + if not async: + assertReply ftp.expectReply(), "226" + return true + return false + + if not async: + ftp.dsock.send(s) + else: + if not ftp.dsock.sendAsync(s): + ftp.job.toStore = s ftp.job.progress.inc(len) ftp.job.oneSecond.inc(len) - - if ftp.csock notin readSocks: - # TODO: Why does this block? Why does select - # think that the socket is readable? - assertReply ftp.expectReply(), "226" - return true proc store*(ftp: var TFTPClient, file, dest: string, async = false) = ## Uploads ``file`` to ``dest`` on the remote FTP server. Usage of this @@ -353,44 +391,32 @@ proc store*(ftp: var TFTPClient, file, dest: string, async = false) = ## the download. ## The ``EvStore`` event is given by ``poll`` when the upload is finished, ## and the ``filename`` field will be equal to ``file``. - ftp.createJob(asyncUpload, JStore) + ftp.createJob(doUpload, JStore) ftp.job.file = open(file) ftp.job.total = ftp.job.file.getFileSize() ftp.job.lastProgressReport = epochTime() ftp.job.filename = file ftp.pasv() - assertReply ftp.send("STOR " & dest), ["125", "150"] + assertReply ftp.send("STOR " & dest.normalizePathSep), ["125", "150"] if not async: - while not ftp.job.prc(ftp, 500): nil + while not ftp.job.prc(ftp, false): nil ftp.deleteJob() -proc poll*(ftp: var TFTPClient, r: var TFTPEvent, timeout = 500): bool = - ## Progresses an async job(if available). Returns true if ``r`` has been set. +proc close*(ftp: var TFTPClient) = + ## Terminates the connection to the server. + assertReply ftp.send("QUIT"), "221" + if ftp.jobInProgress: ftp.deleteJob() + ftp.csock.close() + ftp.dsock.close() + +proc handleTask(h: PObject) = + var ftp = PAsyncFTPClient(h) if ftp.jobInProgress: - if ftp.job.prc(ftp, timeout): - result = true - case ftp.job.typ - of JListCmd, JRetrText: - r.typ = EvLines - r.lines = ftp.job.lines - of JRetr: - r.typ = EvRetr - r.filename = ftp.job.filename - if ftp.job.progress != ftp.job.total: - raise newException(EFTP, "Didn't download full file.") - of JStore: - r.typ = EvStore - r.filename = ftp.job.filename - if ftp.job.progress != ftp.job.total: - raise newException(EFTP, "Didn't upload full file.") - ftp.deleteJob() - return - if ftp.job.typ in {JRetr, JStore}: if epochTime() - ftp.job.lastProgressReport >= 1.0: - result = true + var r: TFTPEvent ftp.job.lastProgressReport = epochTime() r.typ = EvTransferProgress r.bytesTotal = ftp.job.total @@ -398,21 +424,113 @@ proc poll*(ftp: var TFTPClient, r: var TFTPEvent, timeout = 500): bool = r.speed = ftp.job.oneSecond r.filename = ftp.job.filename ftp.job.oneSecond = 0 + ftp.handleEvent(ftp[], r, ftp.userArg) -proc close*(ftp: var TFTPClient) = - ## Terminates the connection to the server. - assertReply ftp.send("QUIT"), "221" - if ftp.jobInProgress: ftp.deleteJob() - ftp.csock.close() - ftp.dsock.close() +proc getSocket(h: PObject): tuple[info: TInfo, sock: TSocket] = + result = (SockIdle, InvalidSocket) + var ftp = PAsyncFTPClient(h) + if ftp.jobInProgress: + case ftp.job.typ + of JRetrText, JRetr, JStore: + if ftp.dsockStatus == SockConnecting or ftp.dsockStatus == SockConnected: + result = (ftp.dsockStatus, ftp.dsock) + else: result = (SockIdle, ftp.dsock) + +proc handleConnect(h: PObject) = + var ftp = PAsyncFTPClient(h) + ftp.dsockStatus = SockConnected + assert(ftp.jobInProgress) + if ftp.job.typ == JStore: + ftp.dele.mode = MWriteable + else: + ftp.dele.mode = MReadable + +proc handleRead(h: PObject) = + var ftp = PAsyncFTPClient(h) + assert(ftp.jobInProgress) + assert(ftp.job.typ != JStore) + # This can never return true, because it shouldn't check for code + # 226 from csock. + assert(not ftp.job.prc(ftp[], true)) + +proc handleWrite(h: PObject) = + var ftp = PAsyncFTPClient(h) + if ftp.jobInProgress: + if ftp.job.typ == JStore: + assert (not ftp.job.prc(ftp[], true)) + +proc csockGetSocket(h: PObject): tuple[info: TInfo, sock: TSocket] = + # This only returns the csock if a job is in progress. Otherwise handle read + # would capture data which is not for it to capture. + result = (SockIdle, InvalidSocket) + var ftp = PAsyncFTPClient(h) + if ftp.jobInProgress: + result = (SockConnected, ftp.csock) + +proc csockHandleRead(h: PObject) = + var ftp = PAsyncFTPClient(h) + assert(ftp.jobInProgress) + assertReply ftp[].expectReply(), "226" # Make sure the transfer completed. + var r: TFTPEvent + case ftp.job.typ + of JRetrText: + r.typ = EvLines + r.lines = ftp.job.lines + of JRetr: + r.typ = EvRetr + r.filename = ftp.job.filename + if ftp.job.progress != ftp.job.total: + raise newException(EFTP, "Didn't download full file.") + of JStore: + r.typ = EvStore + r.filename = ftp.job.filename + if ftp.job.progress != ftp.job.total: + raise newException(EFTP, "Didn't upload full file.") + ftp[].deleteJob() + ftp.handleEvent(ftp[], r, ftp.userArg) + +proc AsyncFTPClient*(address: string, port = TPort(21), + user, pass = "", userArg: PObject = nil): PAsyncFTPClient = + ## Create a ``PAsyncFTPClient`` object. + ## + ## Use this if you want to use asyncio's dispatcher. + new(result) + result.user = user + result.pass = pass + result.address = address + result.port = port + result.isAsync = true + result.dsockStatus = SockIdle + result.userArg = userArg + result.handleEvent = (proc (ftp: var TAsyncFTPClient, ev: TFTPEvent, + userArg: PObject) = nil) + +proc register*(d: PDispatcher, ftp: PAsyncFTPClient) = + ## Registers ``ftp`` with dispatcher ``d``. + ftp.dele = newDelegate() + ftp.dele.deleVal = ftp + ftp.dele.getSocket = getSocket + ftp.dele.task = handleTask + ftp.dele.handleConnect = handleConnect + ftp.dele.handleRead = handleRead + ftp.dele.handleWrite = handleWrite + d.register(ftp.dele) + + # Add csock into the dispatcher (to check for 226). + var cDele = newDelegate() + cDele.deleVal = ftp + cDele.getSocket = csockGetSocket + cDele.handleRead = csockHandleRead + d.register(cDele) when isMainModule: - var ftp = FTPClient("ex.org", user = "user", pass = "p") + var ftp = FTPClient("picheta.me", user = "blah", pass = "sd") ftp.connect() echo ftp.pwd() echo ftp.list() - - ftp.store("payload.avi", "payload.avi", async = true) + echo("uploading") + ftp.store("payload.avi", "payload.avi", async = false) + discard """ while True: var event: TFTPEvent if ftp.poll(event): @@ -429,8 +547,10 @@ when isMainModule: " - ", time, " seconds") else: assert(false) - - ftp.retrFile("payload.avi", "payload2.avi", async = true) + """ + echo("Upload complete") + ftp.retrFile("payload.avi", "payload2.avi", async = false) + discard """ while True: var event: TFTPEvent if ftp.poll(event): @@ -441,7 +561,8 @@ when isMainModule: of EvTransferProgress: echo(event.speed div 1000, " kb/s") else: assert(false) - + """ + echo("Download complete") sleep(5000) ftp.close() sleep(200) diff --git a/lib/pure/irc.nim b/lib/pure/irc.nim index 3e1fa1c2c..09e85f234 100644 --- a/lib/pure/irc.nim +++ b/lib/pure/irc.nim @@ -26,15 +26,15 @@ ## of EvMsg: ## # Where all the magic happens. -import sockets, strutils, parseutils, times +import sockets, strutils, parseutils, times, asyncio type - TIRC* = object + TIRC* = object of TObject address: string port: TPort nick, user, realname, serverPass: string sock: TSocket - connected: bool + status: TInfo lastPing: float lastPong: float lag: float @@ -42,6 +42,11 @@ type msgLimit: bool messageBuffer: seq[tuple[timeToSend: float, m: string]] + PAsyncIRC* = ref TAsyncIRC + TAsyncIRC* = object of TIRC + userArg: PObject + handleEvent: proc (irc: var TAsyncIRC, ev: TIRCEvent, userArg: PObject) + TIRCMType* = enum MUnknown, MNumeric, @@ -89,7 +94,7 @@ proc send*(irc: var TIRC, message: string, sendImmediately = false) = except EOS: # Assuming disconnection of every EOS could be bad, # but I can't exactly check for EBrokenPipe. - irc.connected = false + irc.status = SockClosed proc privmsg*(irc: var TIRC, target, message: string) = ## Sends ``message`` to ``target``. ``Target`` can be a channel, or a user. @@ -188,7 +193,7 @@ proc connect*(irc: var TIRC) = irc.sock = socket() irc.sock.connect(irc.address, irc.port) - irc.connected = true + irc.status = SockConnected # Greet the server :) if irc.serverPass != "": irc.send("PASS " & irc.serverPass, true) @@ -201,7 +206,7 @@ proc irc*(address: string, port: TPort = 6667.TPort, realname = "NimrodBot", serverPass = "", joinChans: seq[string] = @[], msgLimit: bool = true): TIRC = - ## This function calls `connect`, so you don't need to. + ## Creates a ``TIRC`` object. result.address = address result.port = port result.nick = nick @@ -214,45 +219,33 @@ proc irc*(address: string, port: TPort = 6667.TPort, result.channelsToJoin = joinChans result.msgLimit = msgLimit result.messageBuffer = @[] + result.status = SockIdle - result.connect() - -proc poll*(irc: var TIRC, ev: var TIRCEvent, - timeout: int = 500): bool = - ## This function parses a single message from the IRC server and returns - ## a TIRCEvent. - ## - ## This function should be called often as it also handles pinging - ## the server. - if not irc.connected: ev.typ = EvDisconnected - var line = TaintedString"" - var socks = @[irc.sock] - var ret = socks.select(timeout) - if socks.len() == 0 and ret == 1: - if irc.sock.recvLine(line): - if line.string.len == 0: - ev.typ = EvDisconnected - else: - ev = parseMessage(line.string) - # Get the origin - ev.origin = ev.params[0] - if ev.origin == irc.nick: ev.origin = ev.nick - - if ev.cmd == MError: - ev.typ = EvDisconnected - return - - if ev.cmd == MPing: - irc.send("PONG " & ev.params[0]) - if ev.cmd == MPong: - irc.lag = epochTime() - parseFloat(ev.params[ev.params.high]) - irc.lastPong = epochTime() - if ev.cmd == MNumeric: - if ev.numeric == "001": - for chan in items(irc.channelsToJoin): - irc.join(chan) - result = true +proc processLine(irc: var TIRC, line: string): TIRCEvent = + if line.len == 0: + result.typ = EvDisconnected + else: + result = parseMessage(line) + # Get the origin + result.origin = result.params[0] + if result.origin == irc.nick: result.origin = result.nick + + if result.cmd == MError: + result.typ = EvDisconnected + return + if result.cmd == MPing: + irc.send("PONG " & result.params[0]) + if result.cmd == MPong: + irc.lag = epochTime() - parseFloat(result.params[result.params.high]) + irc.lastPong = epochTime() + if result.cmd == MNumeric: + if result.numeric == "001": + for chan in items(irc.channelsToJoin): + irc.join(chan) + +proc processOther(irc: var TIRC, ev: var TIRCEvent): bool = + result = false if epochTime() - irc.lastPing >= 20.0: irc.lastPing = epochTime() irc.send("PING :" & formatFloat(irc.lastPing), true) @@ -260,7 +253,6 @@ proc poll*(irc: var TIRC, ev: var TIRCEvent, if epochTime() - irc.lastPong >= 120.0 and irc.lastPong != -1.0: ev.typ = EvDisconnected # TODO: EvTimeout? return true - for i in 0..irc.messageBuffer.len-1: if epochTime() >= irc.messageBuffer[0][0]: @@ -270,6 +262,30 @@ proc poll*(irc: var TIRC, ev: var TIRCEvent, break # messageBuffer is guaranteed to be from the quickest to the # later-est. +proc poll*(irc: var TIRC, ev: var TIRCEvent, + timeout: int = 500): bool = + ## This function parses a single message from the IRC server and returns + ## a TIRCEvent. + ## + ## This function should be called often as it also handles pinging + ## the server. + ## + ## This function provides a somewhat asynchronous IRC implementation, although + ## it should only be used for simple things for example an IRC bot which does + ## not need to be running many time critical tasks in the background. If you + ## require this, use the asyncio implementation. + + if not (irc.status == SockConnected): ev.typ = EvDisconnected + var line = TaintedString"" + var socks = @[irc.sock] + var ret = socks.select(timeout) + if socks.len() == 0 and ret != 0: + if irc.sock.recvLine(line): + ev = irc.processLine(line) + result = true + + if processOther(irc, ev): result = true + proc getLag*(irc: var TIRC): float = ## Returns the latency between this client and the IRC server in seconds. ## @@ -278,8 +294,88 @@ proc getLag*(irc: var TIRC): float = proc isConnected*(irc: var TIRC): bool = ## Returns whether this IRC client is connected to an IRC server. - return irc.connected + return irc.status == SockConnected + +# -- Asyncio dispatcher + +proc connect*(irc: PAsyncIRC) = + ## Equivalent of connect for ``TIRC`` but specifically created for asyncio. + assert(irc.address != "") + assert(irc.port != TPort(0)) + + irc.sock = socket() + irc.sock.setBlocking(false) + irc.sock.connectAsync(irc.address, irc.port) + irc.status = SockConnecting + +proc handleConnect(h: PObject) = + var irc = PAsyncIRC(h) + + # Greet the server :) + if irc.serverPass != "": irc[].send("PASS " & irc.serverPass, true) + irc[].send("NICK " & irc.nick, true) + irc[].send("USER $1 * 0 :$2" % [irc.user, irc.realname], true) + irc.status = SockConnected + +proc handleRead(h: PObject) = + var irc = PAsyncIRC(h) + var line = "" + if irc.sock.recvLine(line): + var ev = irc[].processLine(line) + irc.handleEvent(irc[], ev, irc.userArg) + +proc handleTask(h: PObject) = + var irc = PAsyncIRC(h) + var ev: TIRCEvent + if PAsyncIRC(h)[].processOther(ev): + irc.handleEvent(irc[], ev, irc.userArg) + +proc asyncIRC*(address: string, port: TPort = 6667.TPort, + nick = "NimrodBot", + user = "NimrodBot", + realname = "NimrodBot", serverPass = "", + joinChans: seq[string] = @[], + msgLimit: bool = true, + ircEvent: proc (irc: var TAsyncIRC, ev: TIRCEvent, + userArg: PObject), + userArg: PObject = nil): PAsyncIRC = + ## Use this function if you want to use asyncio's dispatcher. + ## + ## **Note:** Do **NOT** use this if you're writing a simple IRC bot which only + ## requires one task to be run, i.e. this should not be used if you want a + ## synchronous IRC client implementation, use ``irc`` for that. + + new(result) + result.address = address + result.port = port + result.nick = nick + result.user = user + result.realname = realname + result.serverPass = serverPass + result.lastPing = epochTime() + result.lastPong = -1.0 + result.lag = -1.0 + result.channelsToJoin = joinChans + result.msgLimit = msgLimit + result.messageBuffer = @[] + result.handleEvent = ircEvent + result.userArg = userArg + +proc register*(d: PDispatcher, irc: PAsyncIRC) = + ## Registers ``irc`` with dispatcher ``d``. + var dele = newDelegate() + dele.deleVal = irc + dele.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] = + if PAsyncIRC(h).status == SockConnecting or + PAsyncIRC(h).status == SockConnected: + return (PAsyncIRC(h).status, PAsyncIRC(h).sock) + else: return (SockIdle, PAsyncIRC(h).sock)) + dele.handleConnect = handleConnect + dele.handleRead = handleRead + dele.task = handleTask + d.register(dele) + when isMainModule: #var m = parseMessage("ERROR :Closing Link: dom96.co.cc (Ping timeout: 252 seconds)") #echo(repr(m)) @@ -288,6 +384,7 @@ when isMainModule: var client = irc("amber.tenthbit.net", nick="TestBot1234", joinChans = @["#flood"]) + client.connect() while True: var event: TIRCEvent if client.poll(event): @@ -305,3 +402,4 @@ when isMainModule: #echo( repr(event) ) #echo("Lag: ", formatFloat(client.getLag())) #""" + diff --git a/lib/pure/os.nim b/lib/pure/os.nim index 44811589a..6b8176dde 100755 --- a/lib/pure/os.nim +++ b/lib/pure/os.nim @@ -151,6 +151,10 @@ else: # UNIX-like operating system ScriptExt* = "" DynlibFormat* = "lib$1.so" +when defined(macosx): + var + pathMax {.importc: "PATH_MAX", header: "<stdlib.h>".}: cint + const ExtSep* = '.' ## The character which separates the base filename from the extension; @@ -505,6 +509,12 @@ proc expandFilename*(filename: string): string {.rtl, extern: "nos$1".} = var L = GetFullPathNameA(filename, 3072'i32, result, unused) if L <= 0'i32 or L >= 3072'i32: OSError() setLen(result, L) + elif defined(macosx): + # On Mac OS X 10.5, realpath does not allocate the buffer on its own + var pathBuffer: cstring = newString(pathMax) + var resultBuffer = realpath(filename, pathBuffer) + if resultBuffer == nil: OSError() + result = $resultBuffer else: var res = realpath(filename, nil) if res == nil: OSError() diff --git a/lib/pure/osproc.nim b/lib/pure/osproc.nim index 7ee43c565..2ed435b07 100755 --- a/lib/pure/osproc.nim +++ b/lib/pure/osproc.nim @@ -487,13 +487,6 @@ elif not defined(useNimRtl): result[i] = cast[cstring](alloc(x.len+1)) copyMem(result[i], addr(x[0]), x.len+1) inc(i) - - proc deallocCStringArray(a: cstringArray) = - var i = 0 - while a[i] != nil: - dealloc(a[i]) - inc(i) - dealloc(a) proc startProcess(command: string, workingDir: string = "", diff --git a/lib/pure/scgi.nim b/lib/pure/scgi.nim index 546afb2c0..f8a957d87 100755 --- a/lib/pure/scgi.nim +++ b/lib/pure/scgi.nim @@ -7,7 +7,7 @@ # distribution, for details about the copyright. # -## This module implements helper procs for SCGI applictions. Example: +## This module implements helper procs for SCGI applications. Example: ## ## .. code-block:: Nimrod ## @@ -24,7 +24,7 @@ ## run(handleRequest) ## -import sockets, strutils, os, strtabs +import sockets, strutils, os, strtabs, asyncio type EScgi* = object of EIO ## the exception that is raised, if a SCGI error occurs @@ -58,12 +58,18 @@ proc recvChar(s: TSocket): char = result = c type - TScgiState* {.final.} = object ## SCGI state object + TScgiState* = object of TObject ## SCGI state object server: TSocket bufLen: int client*: TSocket ## the client socket to send data to headers*: PStringTable ## the parsed headers input*: string ## the input buffer + + TAsyncScgiState* = object of TScgiState + handleRequest: proc (server: var TAsyncScgiState, client: TSocket, + input: string, headers: PStringTable,userArg: PObject) + userArg: PObject + PAsyncScgiState* = ref TAsyncScgiState proc recvBuffer(s: var TScgiState, L: int) = if L > s.bufLen: @@ -131,6 +137,48 @@ proc run*(handleRequest: proc (client: TSocket, input: string, s.client.close() s.close() +proc open*(handleRequest: proc (server: var TAsyncScgiState, client: TSocket, + input: string, headers: PStringTable, + userArg: PObject), + port = TPort(4000), address = "127.0.0.1", + userArg: PObject = nil): PAsyncScgiState = + ## Alternative of ``open`` for asyncio compatible SCGI. + new(result) + open(result[], port, address) + result.handleRequest = handleRequest + result.userArg = userArg + +proc getSocket(h: PObject): tuple[info: TInfo, sock: TSocket] = + var s = PAsyncScgiState(h) + return (SockListening, s.server) + +proc handleAccept(h: PObject) = + var s = PAsyncScgiState(h) + + s.client = accept(s.server) + var L = 0 + while true: + var d = s.client.recvChar() + if d notin strutils.digits: + if d != ':': scgiError("':' after length expected") + break + L = L * 10 + ord(d) - ord('0') + recvBuffer(s[], L+1) + s.headers = parseHeaders(s.input, L) + if s.headers["SCGI"] != "1": scgiError("SCGI Version 1 expected") + L = parseInt(s.headers["CONTENT_LENGTH"]) + recvBuffer(s[], L) + + s.handleRequest(s[], s.client, s.input, s.headers, s.userArg) + +proc register*(d: PDispatcher, s: PAsyncScgiState) = + ## Registers ``s`` with dispatcher ``d``. + var dele = newDelegate() + dele.deleVal = s + dele.getSocket = getSocket + dele.handleAccept = handleAccept + d.register(dele) + when false: var counter = 0 proc handleRequest(client: TSocket, input: string, diff --git a/lib/pure/sockets.nim b/lib/pure/sockets.nim index c00dcc80b..564b76343 100755 --- a/lib/pure/sockets.nim +++ b/lib/pure/sockets.nim @@ -142,9 +142,11 @@ proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, else: result = TSocket(posix.socket(ToInt(domain), ToInt(typ), ToInt(protocol))) -proc listen*(socket: TSocket, attempts = 5) = - ## listens to socket. - if listen(cint(socket), cint(attempts)) < 0'i32: OSError() +proc listen*(socket: TSocket, backlog = SOMAXCONN) = + ## Marks ``socket`` as accepting connections. + ## ``Backlog`` specifies the maximum length of the + ## queue of pending connections. + if listen(cint(socket), cint(backlog)) < 0'i32: OSError() proc invalidIp4(s: string) {.noreturn, noinline.} = raise newException(EInvalidValue, "invalid ip4 address: " & s) @@ -239,22 +241,35 @@ proc getSockName*(socket: TSocket): TPort = OSError() result = TPort(sockets.ntohs(name.sin_port)) -proc accept*(server: TSocket): TSocket = - ## waits for a client and returns its socket. ``InvalidSocket`` is returned - ## if an error occurs, or if ``server`` is non-blocking and there are no - ## clients connecting. - var client: Tsockaddr_in - var clientLen: cint = sizeof(client) - result = TSocket(accept(cint(server), cast[ptr TSockAddr](addr(client)), - addr(clientLen))) - proc acceptAddr*(server: TSocket): tuple[sock: TSocket, address: string] = - ## waits for a client and returns its socket and IP address + ## Blocks until a connection is being made from a client. When a connection + ## is made returns the client socket and address of the connecting client. + ## If ``server`` is non-blocking then this function returns immediately, and + ## if there are no connections queued the returned socket will be + ## ``InvalidSocket``. + ## This function will raise EOS if an error occurs. var address: Tsockaddr_in var addrLen: cint = sizeof(address) - var sock = TSocket(accept(cint(server), cast[ptr TSockAddr](addr(address)), - addr(addrLen))) - return (sock, $inet_ntoa(address.sin_addr)) + var sock = accept(cint(server), cast[ptr TSockAddr](addr(address)), + addr(addrLen)) + if sock < 0: + # TODO: Test on Windows. + when defined(windows): + var err = WSAGetLastError() + if err == WSAEINPROGRESS: + return (InvalidSocket, "") + else: OSError() + else: + if errno == EAGAIN or errno == EWOULDBLOCK: + return (InvalidSocket, "") + else: OSError() + else: return (TSocket(sock), $inet_ntoa(address.sin_addr)) + +proc accept*(server: TSocket): TSocket = + ## Equivalent to ``acceptAddr`` but doesn't return the address, only the + ## socket. + var (client, a) = acceptAddr(server) + return client proc close*(socket: TSocket) = ## closes a socket. @@ -428,6 +443,11 @@ proc connectAsync*(socket: TSocket, name: string, port = TPort(0), if not success: OSError() +proc timeValFromMilliseconds(timeout = 500): TTimeVal = + if timeout != -1: + var seconds = timeout div 1000 + result.tv_sec = seconds + result.tv_usec = (timeout - seconds * 1000) * 1000 #proc recvfrom*(s: TWinSocket, buf: cstring, len, flags: cint, # fromm: ptr TSockAddr, fromlen: ptr cint): cint @@ -460,9 +480,7 @@ proc select*(readfds, writefds, exceptfds: var seq[TSocket], ## You can determine whether a socket is ready by checking if it's still ## in one of the TSocket sequences. - var tv: TTimeVal - tv.tv_sec = 0 - tv.tv_usec = timeout * 1000 + var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout) var rd, wr, ex: TFdSet var m = 0 @@ -480,10 +498,8 @@ proc select*(readfds, writefds, exceptfds: var seq[TSocket], pruneSocketSet(exceptfds, (ex)) proc select*(readfds, writefds: var seq[TSocket], - timeout = 500): int = - var tv: TTimeVal - tv.tv_sec = 0 - tv.tv_usec = timeout * 1000 + timeout = 500): int = + var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout) var rd, wr: TFdSet var m = 0 @@ -499,10 +515,8 @@ proc select*(readfds, writefds: var seq[TSocket], pruneSocketSet(writefds, (wr)) proc selectWrite*(writefds: var seq[TSocket], - timeout = 500): int = - var tv: TTimeVal - tv.tv_sec = 0 - tv.tv_usec = timeout * 1000 + timeout = 500): int = + var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout) var wr: TFdSet var m = 0 @@ -515,11 +529,8 @@ proc selectWrite*(writefds: var seq[TSocket], pruneSocketSet(writefds, (wr)) - -proc select*(readfds: var seq[TSocket], timeout = 500): int = - var tv: TTimeVal - tv.tv_sec = 0 - tv.tv_usec = timeout * 1000 +proc select*(readfds: var seq[TSocket], timeout = 500): int = + var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout) var rd: TFdSet var m = 0 @@ -537,11 +548,14 @@ proc recvLine*(socket: TSocket, line: var TaintedString): bool = ## returns false if no further data is available. `Line` must be initialized ## and not nil! This does not throw an EOS exception, therefore ## it can be used in both blocking and non-blocking sockets. + ## If ``socket`` is disconnected, ``true`` will be returned and line will be + ## set to ``""``. setLen(line.string, 0) while true: var c: char var n = recv(cint(socket), addr(c), 1, 0'i32) - if n <= 0: return + if n < 0: return + elif n == 0: return true if c == '\r': n = recv(cint(socket), addr(c), 1, MSG_PEEK) if n > 0 and c == '\L': @@ -625,7 +639,7 @@ proc skip*(socket: TSocket) = proc send*(socket: TSocket, data: pointer, size: int): int = ## sends data to a socket. - when defined(windows): + when defined(windows) or defined(macosx): result = send(cint(socket), data, size, 0'i32) else: result = send(cint(socket), data, size, int32(MSG_NOSIGNAL)) @@ -634,19 +648,20 @@ proc send*(socket: TSocket, data: string) = ## sends data to a socket. if send(socket, cstring(data), data.len) != data.len: OSError() -proc sendAsync*(socket: TSocket, data: string) = - ## sends data to a non-blocking socket. +proc sendAsync*(socket: TSocket, data: string): bool = + ## sends data to a non-blocking socket. Returns whether ``data`` was sent. + result = true var bytesSent = send(socket, cstring(data), data.len) if bytesSent == -1: when defined(windows): var err = WSAGetLastError() # TODO: Test on windows. if err == WSAEINPROGRESS: - return + return false else: OSError() else: if errno == EAGAIN or errno == EWOULDBLOCK: - return + return false else: OSError() when defined(Windows): |