summary refs log tree commit diff stats
path: root/lib/pure/ftpclient.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/ftpclient.nim')
-rw-r--r--lib/pure/ftpclient.nim365
1 files changed, 243 insertions, 122 deletions
diff --git a/lib/pure/ftpclient.nim b/lib/pure/ftpclient.nim
index 776df7c42..6d207b98f 100644
--- a/lib/pure/ftpclient.nim
+++ b/lib/pure/ftpclient.nim
@@ -6,34 +6,26 @@
 #    distribution, for details about the copyright.
 #
 
-import sockets, strutils, parseutils, times, os
+import sockets, strutils, parseutils, times, os, asyncio
 
 ## This module **partially** implements an FTP client as specified
 ## by `RFC 959 <http://tools.ietf.org/html/rfc959>`_. 
-## Functions which require file transfers have an ``async`` parameter, when
-## this parameter is set to ``true``, it is your job to call the ``poll`` 
-## function periodically to progress the transfer.
+## 
+## This module provides both a synchronous and asynchronous implementation.
+## The asynchronous implementation requires you to use the ``AsyncFTPClient``
+## function. You are then required to register the ``PAsyncFTPClient`` with a
+## asyncio dispatcher using the ``register`` function. Take a look at the
+## asyncio module documentation for more information.
 ##
 ## Here is some example usage of this module:
 ## 
 ## .. code-block:: Nimrod
 ##    var ftp = FTPClient("example.org", user = "user", pass = "pass")
 ##    ftp.connect()
-##    ftp.retrFile("file.ext", "file.ext", async = true)
-##    while True:
-##      var event: TFTPEvent
-##      if ftp.poll(event):
-##        case event.typ
-##        of EvRetr:
-##          echo("Download finished!")
-##          break
-##        of EvTransferProgress:
-##          echo(event.speed div 1000, " kb/s")
-##        else: assert(false)
-
+##    ftp.retrFile("file.ext", "file.ext")
 
 type
-  TFTPClient* = object
+  TFTPClient* = object of TObject
     csock: TSocket # Command connection socket
     dsock: TSocket # Data connection socket
     user, pass: string
@@ -43,24 +35,35 @@ type
     jobInProgress: bool
     job: ref TFTPJob
 
+    isAsync: bool
+
+    dsockStatus: TInfo
+
   FTPJobType = enum
-    JListCmd, JRetrText, JRetr, JStore
+    JRetrText, JRetr, JStore
 
   TFTPJob = object
-    prc: proc (ftp: var TFTPClient, timeout: int): bool
+    prc: proc (ftp: var TFTPClient, async: bool): bool
     case typ*: FTPJobType
-    of JListCmd, JRetrText:
+    of JRetrText:
       lines: string
     of JRetr, JStore:
-      dsockClosed: bool
       file: TFile
       filename: string
       total: biggestInt # In bytes.
       progress: biggestInt # In bytes.
       oneSecond: biggestInt # Bytes transferred in one second.
       lastProgressReport: float # Time
+      toStore: string # Data left to upload (Only used with async)
     else: nil
 
+  PAsyncFTPClient* = ref TAsyncFTPClient ## Async alternative to TFTPClient.
+  TAsyncFTPClient* = object of TFTPClient
+    handleEvent*: proc (ftp: var TAsyncFTPClient, ev: TFTPEvent, 
+                        userArg: PObject)
+    dele: PDelegate
+    userArg: PObject
+
   FTPEventType* = enum
     EvTransferProgress, EvLines, EvRetr, EvStore
 
@@ -86,6 +89,9 @@ proc FTPClient*(address: string, port = TPort(21),
   result.address = address
   result.port = port
 
+  result.isAsync = false
+  result.dsockStatus = SockIdle
+
 proc expectReply(ftp: var TFTPClient): TaintedString =
   result = TaintedString""
   if not ftp.csock.recvLine(result): setLen(result.string, 0)
@@ -110,7 +116,7 @@ proc assertReply(received: TaintedString, expected: openarray[string]) =
                      [expected.join("' or '"), received.string])
 
 proc createJob(ftp: var TFTPClient,
-                 prc: proc (ftp: var TFTPClient, timeout: int): bool,
+                 prc: proc (ftp: var TFTPClient, async: bool): bool,
                  cmd: FTPJobType) =
   if ftp.jobInProgress:
     raise newException(EFTP, "Unable to do two jobs at once.")
@@ -119,22 +125,24 @@ proc createJob(ftp: var TFTPClient,
   ftp.job.prc = prc
   ftp.job.typ = cmd
   case cmd
-  of JListCmd, JRetrText:
+  of JRetrText:
     ftp.job.lines = ""
   of JRetr, JStore:
-    ftp.job.dsockClosed = false
+    ftp.job.toStore = ""
 
 proc deleteJob(ftp: var TFTPClient) =
   assert ftp.jobInProgress
   ftp.jobInProgress = false
   case ftp.job.typ
-  of JListCmd, JRetrText:
+  of JRetrText:
     ftp.job.lines = ""
   of JRetr, JStore:
     ftp.job.file.close()
 
 proc pasv(ftp: var TFTPClient) =
   ## Negotiate a data connection.
+  ftp.dsock = socket()
+  if ftp.isAsync: ftp.dsock.setBlocking(false)
   var pasvMsg = ftp.send("PASV").string.strip.TaintedString
   assertReply(pasvMsg, "227")
   var betweenParens = captureBetween(pasvMsg.string, '(', ')')
@@ -142,8 +150,17 @@ proc pasv(ftp: var TFTPClient) =
   var ip = nums[0.. -3]
   var port = nums[-2.. -1]
   var properPort = port[0].parseInt()*256+port[1].parseInt()
-  ftp.dsock = socket()
-  ftp.dsock.connect(ip.join("."), TPort(properPort.toU16))
+  if ftp.isAsync:
+    # connectAsync should work well even if socket is blocking. But we need
+    # isAsync anyway... :\
+    ftp.dsock.connectAsync(ip.join("."), TPort(properPort.toU16))
+    ftp.dsockStatus = SockConnecting
+  else:
+    ftp.dsock.connect(ip.join("."), TPort(properPort.toU16))
+    ftp.dsockStatus = SockConnected
+
+proc normalizePathSep(path: string): string =
+  return replace(path, '\\', '/')
 
 proc connect*(ftp: var TFTPClient) =
   ## Connect to the FTP server specified by ``ftp``.
@@ -167,22 +184,28 @@ proc pwd*(ftp: var TFTPClient): string =
 
 proc cd*(ftp: var TFTPClient, dir: string) =
   ## Changes the current directory on the remote FTP server to ``dir``.
-  assertReply ftp.send("CWD " & dir), "250"
+  assertReply ftp.send("CWD " & dir.normalizePathSep), "250"
 
 proc cdup*(ftp: var TFTPClient) =
   ## Changes the current directory to the parent of the current directory.
   assertReply ftp.send("CDUP"), "200"
 
-proc asyncLines(ftp: var TFTPClient, timeout: int): bool =
-  ## Downloads text data in ASCII mode, Asynchronously.
+proc getLines(ftp: var TFTPClient, async: bool = false): bool =
+  ## Downloads text data in ASCII mode
   ## Returns true if the download is complete.
-  var readSocks: seq[TSocket] = @[ftp.dsock, ftp.csock]
-  if readSocks.select(timeout) != 0:
-    if ftp.dsock notin readSocks:
-      var r = TaintedString""
-      if ftp.dsock.recvLine(r):
+  ## It doesn't if `async` is true, because it doesn't check for 226 then.
+  if ftp.dsockStatus == SockConnected:
+    var r = TaintedString""
+    if ftp.dsock.recvLine(r):
+      if r.string != "":
         ftp.job.lines.add(r.string & "\n")
-    if ftp.csock notin readSocks:
+      else:
+        ftp.dsockStatus = SockClosed
+  
+  if not async:
+    var readSocks: seq[TSocket] = @[ftp.csock]
+    # This is only needed here. Asyncio gets this socket...
+    if readSocks.select(1) != 0 and ftp.csock notin readSocks:
       assertReply ftp.expectReply(), "226"
       return true
 
@@ -191,15 +214,15 @@ proc listDirs*(ftp: var TFTPClient, dir: string = "",
   ## Returns a list of filenames in the given directory. If ``dir`` is "",
   ## the current directory is used. If ``async`` is true, this
   ## function will return immediately and it will be your job to
-  ## call ``poll`` to progress this operation.
+  ## use asyncio's ``poll`` to progress this operation.
 
-  ftp.createJob(asyncLines, JRetrText)
+  ftp.createJob(getLines, JRetrText)
   ftp.pasv()
 
-  assertReply ftp.send("NLST " & dir), ["125", "150"]
+  assertReply ftp.send("NLST " & dir.normalizePathSep), ["125", "150"]
 
   if not async:
-    while not ftp.job.prc(ftp, 500): nil
+    while not ftp.job.prc(ftp, false): nil
     result = splitLines(ftp.job.lines)
     ftp.deleteJob()
   else: return @[]
@@ -211,7 +234,7 @@ proc fileExists*(ftp: var TFTPClient, file: string): bool =
   ## files, because a full list of file names must be retrieved.
   var files = ftp.listDirs()
   for f in items(files):
-    if f == file: return true
+    if f.normalizePathSep == file.normalizePathSep: return true
 
 proc createDir*(ftp: var TFTPClient, dir: string, recursive: bool = false) =
   ## Creates a directory ``dir``. If ``recursive`` is true, the topmost
@@ -219,7 +242,7 @@ proc createDir*(ftp: var TFTPClient, dir: string, recursive: bool = false) =
   ## etc. this allows you to give a full path as the ``dir`` without worrying
   ## about subdirectories not existing.
   if not recursive:
-    assertReply ftp.send("MKD " & dir), "257"
+    assertReply ftp.send("MKD " & dir.normalizePathSep), "257"
   else:
     var reply = TaintedString""
     var previousDirs = ""
@@ -249,20 +272,21 @@ proc chmod*(ftp: var TFTPClient, path: string,
     of fpOthersRead: otherOctal.inc(4)
 
   var perm = $userOctal & $groupOctal & $otherOctal
-  assertReply ftp.send("SITE CHMOD " & perm & " " & path), "200"
+  assertReply ftp.send("SITE CHMOD " & perm &
+                       " " & path.normalizePathSep), "200"
 
 proc list*(ftp: var TFTPClient, dir: string = "", async = false): string =
   ## Lists all files in ``dir``. If ``dir`` is ``""``, uses the current
   ## working directory. If ``async`` is true, this function will return
-  ## immediately and it will be your job to call ``poll`` to progress this
-  ## operation.
-  ftp.createJob(asyncLines, JRetrText)
+  ## immediately and it will be your job to call asyncio's 
+  ## ``poll`` to progress this operation.
+  ftp.createJob(getLines, JRetrText)
   ftp.pasv()
 
-  assertReply(ftp.send("LIST" & " " & dir), ["125", "150"])
+  assertReply(ftp.send("LIST" & " " & dir.normalizePathSep), ["125", "150"])
 
   if not async:
-    while not ftp.job.prc(ftp, 500): nil
+    while not ftp.job.prc(ftp, false): nil
     result = ftp.job.lines
     ftp.deleteJob()
   else:
@@ -272,28 +296,36 @@ proc retrText*(ftp: var TFTPClient, file: string, async = false): string =
   ## Retrieves ``file``. File must be ASCII text.
   ## If ``async`` is true, this function will return immediately and
   ## it will be your job to call ``poll`` to progress this operation.
-  ftp.createJob(asyncLines, JRetrText)
+  ftp.createJob(getLines, JRetrText)
   ftp.pasv()
-  assertReply ftp.send("RETR " & file), ["125", "150"]
+  assertReply ftp.send("RETR " & file.normalizePathSep), ["125", "150"]
   
   if not async:
-    while not ftp.job.prc(ftp, 500): nil
+    while not ftp.job.prc(ftp, false): nil
     result = ftp.job.lines
     ftp.deleteJob()
   else:
     return ""
 
-proc asyncFile(ftp: var TFTPClient, timeout: int): bool =
-  var readSocks: seq[TSocket] = @[ftp.dsock, ftp.csock]
-  if readSocks.select(timeout) != 0:
-    if ftp.dsock notin readSocks:
-      var r = ftp.dsock.recv().string
-      if r != "":
-        ftp.job.progress.inc(r.len)
-        ftp.job.oneSecond.inc(r.len)
-        ftp.job.file.write(r)
-      
-    if ftp.csock notin readSocks:
+proc getFile(ftp: var TFTPClient, async = false): bool =
+  if ftp.dsockStatus == SockConnected:
+    var r = "".TaintedString
+    var returned = false
+    if async: returned = ftp.dsock.recvAsync(r)
+    else: 
+      r = ftp.dsock.recv()
+      returned = true
+    let r2 = r.string
+    if r2 != "":
+      ftp.job.progress.inc(r2.len)
+      ftp.job.oneSecond.inc(r2.len)
+      ftp.job.file.write(r2)
+    elif returned and r2 == "":
+      ftp.dsockStatus = SockClosed
+  
+  if not async:
+    var readSocks: seq[TSocket] = @[ftp.csock]
+    if readSocks.select(1) != 0 and ftp.csock notin readSocks:
       assertReply ftp.expectReply(), "226"
       return true
 
@@ -302,10 +334,10 @@ proc retrFile*(ftp: var TFTPClient, file, dest: string, async = false) =
   ## asynchronously is recommended to view the progress of the download.
   ## The ``EvRetr`` event is given by ``poll`` when the download is finished,
   ## and the ``filename`` field will be equal to ``file``.
-  ftp.createJob(asyncFile, JRetr)
+  ftp.createJob(getFile, JRetr)
   ftp.job.file = open(dest, mode = fmWrite)
   ftp.pasv()
-  var reply = ftp.send("RETR " & file)
+  var reply = ftp.send("RETR " & file.normalizePathSep)
   assertReply reply, ["125", "150"]
   if {'(', ')'} notin reply.string:
     raise newException(EInvalidReply, "Reply has no file size.")
@@ -315,37 +347,43 @@ proc retrFile*(ftp: var TFTPClient, file, dest: string, async = false) =
     
   ftp.job.total = fileSize
   ftp.job.lastProgressReport = epochTime()
-  ftp.job.filename = file
+  ftp.job.filename = file.normalizePathSep
 
   if not async:
-    while not ftp.job.prc(ftp, 500): nil
+    while not ftp.job.prc(ftp, false): nil
     ftp.deleteJob()
 
-proc asyncUpload(ftp: var TFTPClient, timeout: int): bool =
-  var writeSocks: seq[TSocket] = @[ftp.dsock]
-  var readSocks: seq[TSocket] = @[ftp.csock]
-
-  if select(readSocks, writeSocks, timeout) != 0:
-    if ftp.dsock notin writeSocks and not ftp.job.dsockClosed:
-      var buffer: array[0..1023, byte]
-      var len = ftp.job.file.readBytes(buffer, 0, 1024)
+proc doUpload(ftp: var TFTPClient, async = false): bool =
+  if ftp.dsockStatus == SockConnected:
+    if ftp.job.toStore.len() > 0:
+      assert(async)
+      if ftp.dsock.sendAsync(ftp.job.toStore):
+        ftp.job.toStore = ""
+        ftp.job.progress.inc(ftp.job.toStore.len)
+        ftp.job.oneSecond.inc(ftp.job.toStore.len)
+      
+    else:
+      var s = newStringOfCap(4000)
+      var len = ftp.job.file.readBuffer(addr(s[0]), 4000)
+      setLen(s, len)
       if len == 0:
         # File finished uploading.
         ftp.dsock.close()
-        ftp.job.dsockClosed = true
-        return
-
-      if ftp.dsock.send(addr(buffer), len) != len:
-        raise newException(EIO, "could not 'send' all data.")
+        ftp.dsockStatus = SockClosed
+  
+        if not async:
+          assertReply ftp.expectReply(), "226"
+          return true
+        return false
+    
+      if not async:
+        ftp.dsock.send(s)
+      else:
+        if not ftp.dsock.sendAsync(s):
+          ftp.job.toStore = s
       
       ftp.job.progress.inc(len)
       ftp.job.oneSecond.inc(len)
-  
-    if ftp.csock notin readSocks:
-      # TODO: Why does this block? Why does select 
-      # think that the socket is readable?
-      assertReply ftp.expectReply(), "226"
-      return true
 
 proc store*(ftp: var TFTPClient, file, dest: string, async = false) =
   ## Uploads ``file`` to ``dest`` on the remote FTP server. Usage of this
@@ -353,44 +391,32 @@ proc store*(ftp: var TFTPClient, file, dest: string, async = false) =
   ## the download.
   ## The ``EvStore`` event is given by ``poll`` when the upload is finished,
   ## and the ``filename`` field will be equal to ``file``.
-  ftp.createJob(asyncUpload, JStore)
+  ftp.createJob(doUpload, JStore)
   ftp.job.file = open(file)
   ftp.job.total = ftp.job.file.getFileSize()
   ftp.job.lastProgressReport = epochTime()
   ftp.job.filename = file
   ftp.pasv()
   
-  assertReply ftp.send("STOR " & dest), ["125", "150"]
+  assertReply ftp.send("STOR " & dest.normalizePathSep), ["125", "150"]
 
   if not async:
-    while not ftp.job.prc(ftp, 500): nil
+    while not ftp.job.prc(ftp, false): nil
     ftp.deleteJob()
 
-proc poll*(ftp: var TFTPClient, r: var TFTPEvent, timeout = 500): bool =
-  ## Progresses an async job(if available). Returns true if ``r`` has been set.
+proc close*(ftp: var TFTPClient) =
+  ## Terminates the connection to the server.
+  assertReply ftp.send("QUIT"), "221"
+  if ftp.jobInProgress: ftp.deleteJob()
+  ftp.csock.close()
+  ftp.dsock.close()
+
+proc handleTask(h: PObject) =
+  var ftp = PAsyncFTPClient(h)
   if ftp.jobInProgress:
-    if ftp.job.prc(ftp, timeout):
-      result = true
-      case ftp.job.typ
-      of JListCmd, JRetrText:
-        r.typ = EvLines
-        r.lines = ftp.job.lines
-      of JRetr:
-        r.typ = EvRetr
-        r.filename = ftp.job.filename
-        if ftp.job.progress != ftp.job.total:
-          raise newException(EFTP, "Didn't download full file.")
-      of JStore:
-        r.typ = EvStore
-        r.filename = ftp.job.filename
-        if ftp.job.progress != ftp.job.total:
-          raise newException(EFTP, "Didn't upload full file.")
-      ftp.deleteJob()
-      return
-    
     if ftp.job.typ in {JRetr, JStore}:
       if epochTime() - ftp.job.lastProgressReport >= 1.0:
-        result = true
+        var r: TFTPEvent
         ftp.job.lastProgressReport = epochTime()
         r.typ = EvTransferProgress
         r.bytesTotal = ftp.job.total
@@ -398,21 +424,113 @@ proc poll*(ftp: var TFTPClient, r: var TFTPEvent, timeout = 500): bool =
         r.speed = ftp.job.oneSecond
         r.filename = ftp.job.filename
         ftp.job.oneSecond = 0
+        ftp.handleEvent(ftp[], r, ftp.userArg)
 
-proc close*(ftp: var TFTPClient) =
-  ## Terminates the connection to the server.
-  assertReply ftp.send("QUIT"), "221"
-  if ftp.jobInProgress: ftp.deleteJob()
-  ftp.csock.close()
-  ftp.dsock.close()
+proc getSocket(h: PObject): tuple[info: TInfo, sock: TSocket] =
+  result = (SockIdle, InvalidSocket)
+  var ftp = PAsyncFTPClient(h)
+  if ftp.jobInProgress:
+    case ftp.job.typ
+    of JRetrText, JRetr, JStore:
+      if ftp.dsockStatus == SockConnecting or ftp.dsockStatus == SockConnected:
+        result = (ftp.dsockStatus, ftp.dsock)
+      else: result = (SockIdle, ftp.dsock)
+
+proc handleConnect(h: PObject) =
+  var ftp = PAsyncFTPClient(h)
+  ftp.dsockStatus = SockConnected
+  assert(ftp.jobInProgress)
+  if ftp.job.typ == JStore:
+    ftp.dele.mode = MWriteable
+  else: 
+    ftp.dele.mode = MReadable
+
+proc handleRead(h: PObject) =
+  var ftp = PAsyncFTPClient(h)
+  assert(ftp.jobInProgress)
+  assert(ftp.job.typ != JStore)
+  # This can never return true, because it shouldn't check for code 
+  # 226 from csock.
+  assert(not ftp.job.prc(ftp[], true))
+
+proc handleWrite(h: PObject) =
+  var ftp = PAsyncFTPClient(h)
+  if ftp.jobInProgress:
+    if ftp.job.typ == JStore:
+      assert (not ftp.job.prc(ftp[], true))
+
+proc csockGetSocket(h: PObject): tuple[info: TInfo, sock: TSocket] =
+  # This only returns the csock if a job is in progress. Otherwise handle read
+  # would capture data which is not for it to capture.
+  result = (SockIdle, InvalidSocket)
+  var ftp = PAsyncFTPClient(h)
+  if ftp.jobInProgress:
+    result = (SockConnected, ftp.csock)
+
+proc csockHandleRead(h: PObject) =
+  var ftp = PAsyncFTPClient(h)
+  assert(ftp.jobInProgress)
+  assertReply ftp[].expectReply(), "226" # Make sure the transfer completed.
+  var r: TFTPEvent
+  case ftp.job.typ
+  of JRetrText:
+    r.typ = EvLines
+    r.lines = ftp.job.lines
+  of JRetr:
+    r.typ = EvRetr
+    r.filename = ftp.job.filename
+    if ftp.job.progress != ftp.job.total:
+      raise newException(EFTP, "Didn't download full file.")
+  of JStore:
+    r.typ = EvStore
+    r.filename = ftp.job.filename
+    if ftp.job.progress != ftp.job.total:
+      raise newException(EFTP, "Didn't upload full file.")
+  ftp[].deleteJob()
+  ftp.handleEvent(ftp[], r, ftp.userArg)
+
+proc AsyncFTPClient*(address: string, port = TPort(21),
+                     user, pass = "", userArg: PObject = nil): PAsyncFTPClient =
+  ## Create a ``PAsyncFTPClient`` object.
+  ##
+  ## Use this if you want to use asyncio's dispatcher.
+  new(result)
+  result.user = user
+  result.pass = pass
+  result.address = address
+  result.port = port
+  result.isAsync = true
+  result.dsockStatus = SockIdle
+  result.userArg = userArg
+  result.handleEvent = (proc (ftp: var TAsyncFTPClient, ev: TFTPEvent,
+                               userArg: PObject) = nil)
+
+proc register*(d: PDispatcher, ftp: PAsyncFTPClient) =
+  ## Registers ``ftp`` with dispatcher ``d``.
+  ftp.dele = newDelegate()
+  ftp.dele.deleVal = ftp
+  ftp.dele.getSocket = getSocket
+  ftp.dele.task = handleTask
+  ftp.dele.handleConnect = handleConnect
+  ftp.dele.handleRead = handleRead
+  ftp.dele.handleWrite = handleWrite
+  d.register(ftp.dele)
+
+  # Add csock into the dispatcher (to check for 226).
+  var cDele = newDelegate()
+  cDele.deleVal = ftp
+  cDele.getSocket = csockGetSocket
+  cDele.handleRead = csockHandleRead
+  d.register(cDele)
 
 when isMainModule:
-  var ftp = FTPClient("ex.org", user = "user", pass = "p")
+  var ftp = FTPClient("picheta.me", user = "blah", pass = "sd")
   ftp.connect()
   echo ftp.pwd()
   echo ftp.list()
-
-  ftp.store("payload.avi", "payload.avi", async = true)
+  echo("uploading")
+  ftp.store("payload.avi", "payload.avi", async = false)
+  discard """
   while True:
     var event: TFTPEvent
     if ftp.poll(event):
@@ -429,8 +547,10 @@ when isMainModule:
              " - ", time, " seconds")
 
       else: assert(false)
-
-  ftp.retrFile("payload.avi", "payload2.avi", async = true)
+  """
+  echo("Upload complete")
+  ftp.retrFile("payload.avi", "payload2.avi", async = false)
+  discard """
   while True:
     var event: TFTPEvent
     if ftp.poll(event):
@@ -441,7 +561,8 @@ when isMainModule:
       of EvTransferProgress:
         echo(event.speed div 1000, " kb/s")
       else: assert(false)
-
+  """
+  echo("Download complete")
   sleep(5000)
   ftp.close()
   sleep(200)