diff options
Diffstat (limited to 'lib/pure/ftpclient.nim')
-rw-r--r-- | lib/pure/ftpclient.nim | 365 |
1 files changed, 243 insertions, 122 deletions
diff --git a/lib/pure/ftpclient.nim b/lib/pure/ftpclient.nim index 776df7c42..6d207b98f 100644 --- a/lib/pure/ftpclient.nim +++ b/lib/pure/ftpclient.nim @@ -6,34 +6,26 @@ # distribution, for details about the copyright. # -import sockets, strutils, parseutils, times, os +import sockets, strutils, parseutils, times, os, asyncio ## This module **partially** implements an FTP client as specified ## by `RFC 959 <http://tools.ietf.org/html/rfc959>`_. -## Functions which require file transfers have an ``async`` parameter, when -## this parameter is set to ``true``, it is your job to call the ``poll`` -## function periodically to progress the transfer. +## +## This module provides both a synchronous and asynchronous implementation. +## The asynchronous implementation requires you to use the ``AsyncFTPClient`` +## function. You are then required to register the ``PAsyncFTPClient`` with a +## asyncio dispatcher using the ``register`` function. Take a look at the +## asyncio module documentation for more information. ## ## Here is some example usage of this module: ## ## .. code-block:: Nimrod ## var ftp = FTPClient("example.org", user = "user", pass = "pass") ## ftp.connect() -## ftp.retrFile("file.ext", "file.ext", async = true) -## while True: -## var event: TFTPEvent -## if ftp.poll(event): -## case event.typ -## of EvRetr: -## echo("Download finished!") -## break -## of EvTransferProgress: -## echo(event.speed div 1000, " kb/s") -## else: assert(false) - +## ftp.retrFile("file.ext", "file.ext") type - TFTPClient* = object + TFTPClient* = object of TObject csock: TSocket # Command connection socket dsock: TSocket # Data connection socket user, pass: string @@ -43,24 +35,35 @@ type jobInProgress: bool job: ref TFTPJob + isAsync: bool + + dsockStatus: TInfo + FTPJobType = enum - JListCmd, JRetrText, JRetr, JStore + JRetrText, JRetr, JStore TFTPJob = object - prc: proc (ftp: var TFTPClient, timeout: int): bool + prc: proc (ftp: var TFTPClient, async: bool): bool case typ*: FTPJobType - of JListCmd, JRetrText: + of JRetrText: lines: string of JRetr, JStore: - dsockClosed: bool file: TFile filename: string total: biggestInt # In bytes. progress: biggestInt # In bytes. oneSecond: biggestInt # Bytes transferred in one second. lastProgressReport: float # Time + toStore: string # Data left to upload (Only used with async) else: nil + PAsyncFTPClient* = ref TAsyncFTPClient ## Async alternative to TFTPClient. + TAsyncFTPClient* = object of TFTPClient + handleEvent*: proc (ftp: var TAsyncFTPClient, ev: TFTPEvent, + userArg: PObject) + dele: PDelegate + userArg: PObject + FTPEventType* = enum EvTransferProgress, EvLines, EvRetr, EvStore @@ -86,6 +89,9 @@ proc FTPClient*(address: string, port = TPort(21), result.address = address result.port = port + result.isAsync = false + result.dsockStatus = SockIdle + proc expectReply(ftp: var TFTPClient): TaintedString = result = TaintedString"" if not ftp.csock.recvLine(result): setLen(result.string, 0) @@ -110,7 +116,7 @@ proc assertReply(received: TaintedString, expected: openarray[string]) = [expected.join("' or '"), received.string]) proc createJob(ftp: var TFTPClient, - prc: proc (ftp: var TFTPClient, timeout: int): bool, + prc: proc (ftp: var TFTPClient, async: bool): bool, cmd: FTPJobType) = if ftp.jobInProgress: raise newException(EFTP, "Unable to do two jobs at once.") @@ -119,22 +125,24 @@ proc createJob(ftp: var TFTPClient, ftp.job.prc = prc ftp.job.typ = cmd case cmd - of JListCmd, JRetrText: + of JRetrText: ftp.job.lines = "" of JRetr, JStore: - ftp.job.dsockClosed = false + ftp.job.toStore = "" proc deleteJob(ftp: var TFTPClient) = assert ftp.jobInProgress ftp.jobInProgress = false case ftp.job.typ - of JListCmd, JRetrText: + of JRetrText: ftp.job.lines = "" of JRetr, JStore: ftp.job.file.close() proc pasv(ftp: var TFTPClient) = ## Negotiate a data connection. + ftp.dsock = socket() + if ftp.isAsync: ftp.dsock.setBlocking(false) var pasvMsg = ftp.send("PASV").string.strip.TaintedString assertReply(pasvMsg, "227") var betweenParens = captureBetween(pasvMsg.string, '(', ')') @@ -142,8 +150,17 @@ proc pasv(ftp: var TFTPClient) = var ip = nums[0.. -3] var port = nums[-2.. -1] var properPort = port[0].parseInt()*256+port[1].parseInt() - ftp.dsock = socket() - ftp.dsock.connect(ip.join("."), TPort(properPort.toU16)) + if ftp.isAsync: + # connectAsync should work well even if socket is blocking. But we need + # isAsync anyway... :\ + ftp.dsock.connectAsync(ip.join("."), TPort(properPort.toU16)) + ftp.dsockStatus = SockConnecting + else: + ftp.dsock.connect(ip.join("."), TPort(properPort.toU16)) + ftp.dsockStatus = SockConnected + +proc normalizePathSep(path: string): string = + return replace(path, '\\', '/') proc connect*(ftp: var TFTPClient) = ## Connect to the FTP server specified by ``ftp``. @@ -167,22 +184,28 @@ proc pwd*(ftp: var TFTPClient): string = proc cd*(ftp: var TFTPClient, dir: string) = ## Changes the current directory on the remote FTP server to ``dir``. - assertReply ftp.send("CWD " & dir), "250" + assertReply ftp.send("CWD " & dir.normalizePathSep), "250" proc cdup*(ftp: var TFTPClient) = ## Changes the current directory to the parent of the current directory. assertReply ftp.send("CDUP"), "200" -proc asyncLines(ftp: var TFTPClient, timeout: int): bool = - ## Downloads text data in ASCII mode, Asynchronously. +proc getLines(ftp: var TFTPClient, async: bool = false): bool = + ## Downloads text data in ASCII mode ## Returns true if the download is complete. - var readSocks: seq[TSocket] = @[ftp.dsock, ftp.csock] - if readSocks.select(timeout) != 0: - if ftp.dsock notin readSocks: - var r = TaintedString"" - if ftp.dsock.recvLine(r): + ## It doesn't if `async` is true, because it doesn't check for 226 then. + if ftp.dsockStatus == SockConnected: + var r = TaintedString"" + if ftp.dsock.recvLine(r): + if r.string != "": ftp.job.lines.add(r.string & "\n") - if ftp.csock notin readSocks: + else: + ftp.dsockStatus = SockClosed + + if not async: + var readSocks: seq[TSocket] = @[ftp.csock] + # This is only needed here. Asyncio gets this socket... + if readSocks.select(1) != 0 and ftp.csock notin readSocks: assertReply ftp.expectReply(), "226" return true @@ -191,15 +214,15 @@ proc listDirs*(ftp: var TFTPClient, dir: string = "", ## Returns a list of filenames in the given directory. If ``dir`` is "", ## the current directory is used. If ``async`` is true, this ## function will return immediately and it will be your job to - ## call ``poll`` to progress this operation. + ## use asyncio's ``poll`` to progress this operation. - ftp.createJob(asyncLines, JRetrText) + ftp.createJob(getLines, JRetrText) ftp.pasv() - assertReply ftp.send("NLST " & dir), ["125", "150"] + assertReply ftp.send("NLST " & dir.normalizePathSep), ["125", "150"] if not async: - while not ftp.job.prc(ftp, 500): nil + while not ftp.job.prc(ftp, false): nil result = splitLines(ftp.job.lines) ftp.deleteJob() else: return @[] @@ -211,7 +234,7 @@ proc fileExists*(ftp: var TFTPClient, file: string): bool = ## files, because a full list of file names must be retrieved. var files = ftp.listDirs() for f in items(files): - if f == file: return true + if f.normalizePathSep == file.normalizePathSep: return true proc createDir*(ftp: var TFTPClient, dir: string, recursive: bool = false) = ## Creates a directory ``dir``. If ``recursive`` is true, the topmost @@ -219,7 +242,7 @@ proc createDir*(ftp: var TFTPClient, dir: string, recursive: bool = false) = ## etc. this allows you to give a full path as the ``dir`` without worrying ## about subdirectories not existing. if not recursive: - assertReply ftp.send("MKD " & dir), "257" + assertReply ftp.send("MKD " & dir.normalizePathSep), "257" else: var reply = TaintedString"" var previousDirs = "" @@ -249,20 +272,21 @@ proc chmod*(ftp: var TFTPClient, path: string, of fpOthersRead: otherOctal.inc(4) var perm = $userOctal & $groupOctal & $otherOctal - assertReply ftp.send("SITE CHMOD " & perm & " " & path), "200" + assertReply ftp.send("SITE CHMOD " & perm & + " " & path.normalizePathSep), "200" proc list*(ftp: var TFTPClient, dir: string = "", async = false): string = ## Lists all files in ``dir``. If ``dir`` is ``""``, uses the current ## working directory. If ``async`` is true, this function will return - ## immediately and it will be your job to call ``poll`` to progress this - ## operation. - ftp.createJob(asyncLines, JRetrText) + ## immediately and it will be your job to call asyncio's + ## ``poll`` to progress this operation. + ftp.createJob(getLines, JRetrText) ftp.pasv() - assertReply(ftp.send("LIST" & " " & dir), ["125", "150"]) + assertReply(ftp.send("LIST" & " " & dir.normalizePathSep), ["125", "150"]) if not async: - while not ftp.job.prc(ftp, 500): nil + while not ftp.job.prc(ftp, false): nil result = ftp.job.lines ftp.deleteJob() else: @@ -272,28 +296,36 @@ proc retrText*(ftp: var TFTPClient, file: string, async = false): string = ## Retrieves ``file``. File must be ASCII text. ## If ``async`` is true, this function will return immediately and ## it will be your job to call ``poll`` to progress this operation. - ftp.createJob(asyncLines, JRetrText) + ftp.createJob(getLines, JRetrText) ftp.pasv() - assertReply ftp.send("RETR " & file), ["125", "150"] + assertReply ftp.send("RETR " & file.normalizePathSep), ["125", "150"] if not async: - while not ftp.job.prc(ftp, 500): nil + while not ftp.job.prc(ftp, false): nil result = ftp.job.lines ftp.deleteJob() else: return "" -proc asyncFile(ftp: var TFTPClient, timeout: int): bool = - var readSocks: seq[TSocket] = @[ftp.dsock, ftp.csock] - if readSocks.select(timeout) != 0: - if ftp.dsock notin readSocks: - var r = ftp.dsock.recv().string - if r != "": - ftp.job.progress.inc(r.len) - ftp.job.oneSecond.inc(r.len) - ftp.job.file.write(r) - - if ftp.csock notin readSocks: +proc getFile(ftp: var TFTPClient, async = false): bool = + if ftp.dsockStatus == SockConnected: + var r = "".TaintedString + var returned = false + if async: returned = ftp.dsock.recvAsync(r) + else: + r = ftp.dsock.recv() + returned = true + let r2 = r.string + if r2 != "": + ftp.job.progress.inc(r2.len) + ftp.job.oneSecond.inc(r2.len) + ftp.job.file.write(r2) + elif returned and r2 == "": + ftp.dsockStatus = SockClosed + + if not async: + var readSocks: seq[TSocket] = @[ftp.csock] + if readSocks.select(1) != 0 and ftp.csock notin readSocks: assertReply ftp.expectReply(), "226" return true @@ -302,10 +334,10 @@ proc retrFile*(ftp: var TFTPClient, file, dest: string, async = false) = ## asynchronously is recommended to view the progress of the download. ## The ``EvRetr`` event is given by ``poll`` when the download is finished, ## and the ``filename`` field will be equal to ``file``. - ftp.createJob(asyncFile, JRetr) + ftp.createJob(getFile, JRetr) ftp.job.file = open(dest, mode = fmWrite) ftp.pasv() - var reply = ftp.send("RETR " & file) + var reply = ftp.send("RETR " & file.normalizePathSep) assertReply reply, ["125", "150"] if {'(', ')'} notin reply.string: raise newException(EInvalidReply, "Reply has no file size.") @@ -315,37 +347,43 @@ proc retrFile*(ftp: var TFTPClient, file, dest: string, async = false) = ftp.job.total = fileSize ftp.job.lastProgressReport = epochTime() - ftp.job.filename = file + ftp.job.filename = file.normalizePathSep if not async: - while not ftp.job.prc(ftp, 500): nil + while not ftp.job.prc(ftp, false): nil ftp.deleteJob() -proc asyncUpload(ftp: var TFTPClient, timeout: int): bool = - var writeSocks: seq[TSocket] = @[ftp.dsock] - var readSocks: seq[TSocket] = @[ftp.csock] - - if select(readSocks, writeSocks, timeout) != 0: - if ftp.dsock notin writeSocks and not ftp.job.dsockClosed: - var buffer: array[0..1023, byte] - var len = ftp.job.file.readBytes(buffer, 0, 1024) +proc doUpload(ftp: var TFTPClient, async = false): bool = + if ftp.dsockStatus == SockConnected: + if ftp.job.toStore.len() > 0: + assert(async) + if ftp.dsock.sendAsync(ftp.job.toStore): + ftp.job.toStore = "" + ftp.job.progress.inc(ftp.job.toStore.len) + ftp.job.oneSecond.inc(ftp.job.toStore.len) + + else: + var s = newStringOfCap(4000) + var len = ftp.job.file.readBuffer(addr(s[0]), 4000) + setLen(s, len) if len == 0: # File finished uploading. ftp.dsock.close() - ftp.job.dsockClosed = true - return - - if ftp.dsock.send(addr(buffer), len) != len: - raise newException(EIO, "could not 'send' all data.") + ftp.dsockStatus = SockClosed + + if not async: + assertReply ftp.expectReply(), "226" + return true + return false + + if not async: + ftp.dsock.send(s) + else: + if not ftp.dsock.sendAsync(s): + ftp.job.toStore = s ftp.job.progress.inc(len) ftp.job.oneSecond.inc(len) - - if ftp.csock notin readSocks: - # TODO: Why does this block? Why does select - # think that the socket is readable? - assertReply ftp.expectReply(), "226" - return true proc store*(ftp: var TFTPClient, file, dest: string, async = false) = ## Uploads ``file`` to ``dest`` on the remote FTP server. Usage of this @@ -353,44 +391,32 @@ proc store*(ftp: var TFTPClient, file, dest: string, async = false) = ## the download. ## The ``EvStore`` event is given by ``poll`` when the upload is finished, ## and the ``filename`` field will be equal to ``file``. - ftp.createJob(asyncUpload, JStore) + ftp.createJob(doUpload, JStore) ftp.job.file = open(file) ftp.job.total = ftp.job.file.getFileSize() ftp.job.lastProgressReport = epochTime() ftp.job.filename = file ftp.pasv() - assertReply ftp.send("STOR " & dest), ["125", "150"] + assertReply ftp.send("STOR " & dest.normalizePathSep), ["125", "150"] if not async: - while not ftp.job.prc(ftp, 500): nil + while not ftp.job.prc(ftp, false): nil ftp.deleteJob() -proc poll*(ftp: var TFTPClient, r: var TFTPEvent, timeout = 500): bool = - ## Progresses an async job(if available). Returns true if ``r`` has been set. +proc close*(ftp: var TFTPClient) = + ## Terminates the connection to the server. + assertReply ftp.send("QUIT"), "221" + if ftp.jobInProgress: ftp.deleteJob() + ftp.csock.close() + ftp.dsock.close() + +proc handleTask(h: PObject) = + var ftp = PAsyncFTPClient(h) if ftp.jobInProgress: - if ftp.job.prc(ftp, timeout): - result = true - case ftp.job.typ - of JListCmd, JRetrText: - r.typ = EvLines - r.lines = ftp.job.lines - of JRetr: - r.typ = EvRetr - r.filename = ftp.job.filename - if ftp.job.progress != ftp.job.total: - raise newException(EFTP, "Didn't download full file.") - of JStore: - r.typ = EvStore - r.filename = ftp.job.filename - if ftp.job.progress != ftp.job.total: - raise newException(EFTP, "Didn't upload full file.") - ftp.deleteJob() - return - if ftp.job.typ in {JRetr, JStore}: if epochTime() - ftp.job.lastProgressReport >= 1.0: - result = true + var r: TFTPEvent ftp.job.lastProgressReport = epochTime() r.typ = EvTransferProgress r.bytesTotal = ftp.job.total @@ -398,21 +424,113 @@ proc poll*(ftp: var TFTPClient, r: var TFTPEvent, timeout = 500): bool = r.speed = ftp.job.oneSecond r.filename = ftp.job.filename ftp.job.oneSecond = 0 + ftp.handleEvent(ftp[], r, ftp.userArg) -proc close*(ftp: var TFTPClient) = - ## Terminates the connection to the server. - assertReply ftp.send("QUIT"), "221" - if ftp.jobInProgress: ftp.deleteJob() - ftp.csock.close() - ftp.dsock.close() +proc getSocket(h: PObject): tuple[info: TInfo, sock: TSocket] = + result = (SockIdle, InvalidSocket) + var ftp = PAsyncFTPClient(h) + if ftp.jobInProgress: + case ftp.job.typ + of JRetrText, JRetr, JStore: + if ftp.dsockStatus == SockConnecting or ftp.dsockStatus == SockConnected: + result = (ftp.dsockStatus, ftp.dsock) + else: result = (SockIdle, ftp.dsock) + +proc handleConnect(h: PObject) = + var ftp = PAsyncFTPClient(h) + ftp.dsockStatus = SockConnected + assert(ftp.jobInProgress) + if ftp.job.typ == JStore: + ftp.dele.mode = MWriteable + else: + ftp.dele.mode = MReadable + +proc handleRead(h: PObject) = + var ftp = PAsyncFTPClient(h) + assert(ftp.jobInProgress) + assert(ftp.job.typ != JStore) + # This can never return true, because it shouldn't check for code + # 226 from csock. + assert(not ftp.job.prc(ftp[], true)) + +proc handleWrite(h: PObject) = + var ftp = PAsyncFTPClient(h) + if ftp.jobInProgress: + if ftp.job.typ == JStore: + assert (not ftp.job.prc(ftp[], true)) + +proc csockGetSocket(h: PObject): tuple[info: TInfo, sock: TSocket] = + # This only returns the csock if a job is in progress. Otherwise handle read + # would capture data which is not for it to capture. + result = (SockIdle, InvalidSocket) + var ftp = PAsyncFTPClient(h) + if ftp.jobInProgress: + result = (SockConnected, ftp.csock) + +proc csockHandleRead(h: PObject) = + var ftp = PAsyncFTPClient(h) + assert(ftp.jobInProgress) + assertReply ftp[].expectReply(), "226" # Make sure the transfer completed. + var r: TFTPEvent + case ftp.job.typ + of JRetrText: + r.typ = EvLines + r.lines = ftp.job.lines + of JRetr: + r.typ = EvRetr + r.filename = ftp.job.filename + if ftp.job.progress != ftp.job.total: + raise newException(EFTP, "Didn't download full file.") + of JStore: + r.typ = EvStore + r.filename = ftp.job.filename + if ftp.job.progress != ftp.job.total: + raise newException(EFTP, "Didn't upload full file.") + ftp[].deleteJob() + ftp.handleEvent(ftp[], r, ftp.userArg) + +proc AsyncFTPClient*(address: string, port = TPort(21), + user, pass = "", userArg: PObject = nil): PAsyncFTPClient = + ## Create a ``PAsyncFTPClient`` object. + ## + ## Use this if you want to use asyncio's dispatcher. + new(result) + result.user = user + result.pass = pass + result.address = address + result.port = port + result.isAsync = true + result.dsockStatus = SockIdle + result.userArg = userArg + result.handleEvent = (proc (ftp: var TAsyncFTPClient, ev: TFTPEvent, + userArg: PObject) = nil) + +proc register*(d: PDispatcher, ftp: PAsyncFTPClient) = + ## Registers ``ftp`` with dispatcher ``d``. + ftp.dele = newDelegate() + ftp.dele.deleVal = ftp + ftp.dele.getSocket = getSocket + ftp.dele.task = handleTask + ftp.dele.handleConnect = handleConnect + ftp.dele.handleRead = handleRead + ftp.dele.handleWrite = handleWrite + d.register(ftp.dele) + + # Add csock into the dispatcher (to check for 226). + var cDele = newDelegate() + cDele.deleVal = ftp + cDele.getSocket = csockGetSocket + cDele.handleRead = csockHandleRead + d.register(cDele) when isMainModule: - var ftp = FTPClient("ex.org", user = "user", pass = "p") + var ftp = FTPClient("picheta.me", user = "blah", pass = "sd") ftp.connect() echo ftp.pwd() echo ftp.list() - - ftp.store("payload.avi", "payload.avi", async = true) + echo("uploading") + ftp.store("payload.avi", "payload.avi", async = false) + discard """ while True: var event: TFTPEvent if ftp.poll(event): @@ -429,8 +547,10 @@ when isMainModule: " - ", time, " seconds") else: assert(false) - - ftp.retrFile("payload.avi", "payload2.avi", async = true) + """ + echo("Upload complete") + ftp.retrFile("payload.avi", "payload2.avi", async = false) + discard """ while True: var event: TFTPEvent if ftp.poll(event): @@ -441,7 +561,8 @@ when isMainModule: of EvTransferProgress: echo(event.speed div 1000, " kb/s") else: assert(false) - + """ + echo("Download complete") sleep(5000) ftp.close() sleep(200) |