summary refs log tree commit diff stats
path: root/lib/pure/asyncfile.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncfile.nim')
-rw-r--r--lib/pure/asyncfile.nim354
1 files changed, 283 insertions, 71 deletions
diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim
index c7b9fac18..0f6504342 100644
--- a/lib/pure/asyncfile.nim
+++ b/lib/pure/asyncfile.nim
@@ -9,29 +9,37 @@
 
 ## This module implements asynchronous file reading and writing.
 ##
-## .. code-block:: Nim
-##    import asyncfile, asyncdispatch, os
+##   ```Nim
+##   import std/[asyncfile, asyncdispatch, os]
 ##
-##    proc main() {.async.} =
-##      var file = openAsync(getTempDir() / "foobar.txt", fmReadWrite)
-##      await file.write("test")
-##      file.setFilePos(0)
-##      let data = await file.readAll()
-##      doAssert data == "test"
-##      file.close()
+##   proc main() {.async.} =
+##     var file = openAsync(getTempDir() / "foobar.txt", fmReadWrite)
+##     await file.write("test")
+##     file.setFilePos(0)
+##     let data = await file.readAll()
+##     doAssert data == "test"
+##     file.close()
 ##
-##    waitFor main()
+##   waitFor main()
+##   ```
 
-import asyncdispatch, os
+import std/[asyncdispatch, os]
+
+when defined(nimPreviewSlimSystem):
+  import std/[assertions, syncio]
+  when defined(windows) or defined(nimdoc):
+    import std/widestrs
+
+# TODO: Fix duplication introduced by PR #4683.
 
 when defined(windows) or defined(nimdoc):
-  import winlean
+  import std/winlean
 else:
-  import posix
+  import std/posix
 
 type
   AsyncFile* = ref object
-    fd: AsyncFd
+    fd: AsyncFD
     offset: int64
 
 when defined(windows) or defined(nimdoc):
@@ -48,56 +56,60 @@ when defined(windows) or defined(nimdoc):
     case mode
     of fmRead, fmReadWriteExisting:
       OPEN_EXISTING
-    of fmAppend, fmReadWrite, fmWrite:
-      if fileExists(filename):
-        OPEN_EXISTING
-      else:
-        CREATE_NEW
+    of fmReadWrite, fmWrite:
+      CREATE_ALWAYS
+    of fmAppend:
+      OPEN_ALWAYS
 else:
   proc getPosixFlags(mode: FileMode): cint =
     case mode
     of fmRead:
       result = O_RDONLY
     of fmWrite:
-      result = O_WRONLY or O_CREAT
+      result = O_WRONLY or O_CREAT or O_TRUNC
     of fmAppend:
       result = O_WRONLY or O_CREAT or O_APPEND
     of fmReadWrite:
-      result = O_RDWR or O_CREAT
+      result = O_RDWR or O_CREAT or O_TRUNC
     of fmReadWriteExisting:
       result = O_RDWR
     result = result or O_NONBLOCK
 
-proc getFileSize(f: AsyncFile): int64 =
+proc getFileSize*(f: AsyncFile): int64 =
   ## Retrieves the specified file's size.
   when defined(windows) or defined(nimdoc):
-    var high: DWord
+    var high: DWORD
     let low = getFileSize(f.fd.Handle, addr high)
     if low == INVALID_FILE_SIZE:
       raiseOSError(osLastError())
-    return (high shl 32) or low
+    result = (high shl 32) or low
+  else:
+    let curPos = lseek(f.fd.cint, 0, SEEK_CUR)
+    result = lseek(f.fd.cint, 0, SEEK_END)
+    f.offset = lseek(f.fd.cint, curPos, SEEK_SET)
+    assert(f.offset == curPos)
 
-proc openAsync*(filename: string, mode = fmRead): AsyncFile =
-  ## Opens a file specified by the path in ``filename`` using
-  ## the specified ``mode`` asynchronously.
+proc newAsyncFile*(fd: AsyncFD): AsyncFile =
+  ## Creates `AsyncFile` with a previously opened file descriptor `fd`.
   new result
+  result.fd = fd
+  register(fd)
+
+proc openAsync*(filename: string, mode = fmRead): AsyncFile =
+  ## Opens a file specified by the path in `filename` using
+  ## the specified FileMode `mode` asynchronously.
   when defined(windows) or defined(nimdoc):
     let flags = FILE_FLAG_OVERLAPPED or FILE_ATTRIBUTE_NORMAL
     let desiredAccess = getDesiredAccess(mode)
     let creationDisposition = getCreationDisposition(mode, filename)
-    when useWinUnicode:
-      result.fd = createFileW(newWideCString(filename), desiredAccess,
-          FILE_SHARE_READ,
-          nil, creationDisposition, flags, 0).AsyncFd
-    else:
-      result.fd = createFileA(filename, desiredAccess,
-          FILE_SHARE_READ,
-          nil, creationDisposition, flags, 0).AsyncFd
+    let fd = createFileW(newWideCString(filename), desiredAccess,
+        FILE_SHARE_READ,
+        nil, creationDisposition, flags, 0)
 
-    if result.fd.Handle == INVALID_HANDLE_VALUE:
+    if fd == INVALID_HANDLE_VALUE:
       raiseOSError(osLastError())
 
-    register(result.fd)
+    result = newAsyncFile(fd.AsyncFD)
 
     if mode == fmAppend:
       result.offset = getFileSize(result)
@@ -106,27 +118,104 @@ proc openAsync*(filename: string, mode = fmRead): AsyncFile =
     let flags = getPosixFlags(mode)
     # RW (Owner), RW (Group), R (Other)
     let perm = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH
-    result.fd = open(filename, flags, perm).AsyncFD
-    if result.fd.cint == -1:
+    let fd = open(filename, flags, perm)
+    if fd == -1:
       raiseOSError(osLastError())
 
-    register(result.fd)
+    result = newAsyncFile(fd.AsyncFD)
 
-proc read*(f: AsyncFile, size: int): Future[string] =
-  ## Read ``size`` bytes from the specified file asynchronously starting at
+proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
+  ## Read `size` bytes from the specified file asynchronously starting at
   ## the current position of the file pointer.
   ##
+  ## If the file pointer is past the end of the file then zero is returned
+  ## and no bytes are read into `buf`
+  var retFuture = newFuture[int]("asyncfile.readBuffer")
+
+  when defined(windows) or defined(nimdoc):
+    var ol = newCustom()
+    ol.data = CompletionData(fd: f.fd, cb:
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            assert bytesCount > 0
+            assert bytesCount <= size
+            f.offset.inc bytesCount
+            retFuture.complete(bytesCount)
+          else:
+            if errcode.int32 == ERROR_HANDLE_EOF:
+              retFuture.complete(0)
+            else:
+              retFuture.fail(newOSError(errcode))
+    )
+    ol.offset = DWORD(f.offset and 0xffffffff)
+    ol.offsetHigh = DWORD(f.offset shr 32)
+
+    # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
+    let ret = readFile(f.fd.Handle, buf, size.int32, nil,
+                       cast[POVERLAPPED](ol))
+    if not ret.bool:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        GC_unref(ol)
+        if err.int32 == ERROR_HANDLE_EOF:
+          # This happens in Windows Server 2003
+          retFuture.complete(0)
+        else:
+          retFuture.fail(newOSError(err))
+    else:
+      # Request completed immediately.
+      var bytesRead: DWORD
+      let overlappedRes = getOverlappedResult(f.fd.Handle,
+          cast[POVERLAPPED](ol), bytesRead, false.WINBOOL)
+      if not overlappedRes.bool:
+        let err = osLastError()
+        if err.int32 == ERROR_HANDLE_EOF:
+          retFuture.complete(0)
+        else:
+          retFuture.fail(newOSError(osLastError()))
+      else:
+        assert bytesRead > 0
+        assert bytesRead <= size
+        f.offset.inc bytesRead
+        retFuture.complete(bytesRead)
+  else:
+    proc cb(fd: AsyncFD): bool =
+      result = true
+      let res = read(fd.cint, cast[cstring](buf), size.cint)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 != EAGAIN:
+          retFuture.fail(newOSError(lastError))
+        else:
+          result = false # We still want this callback to be called.
+      elif res == 0:
+        # EOF
+        retFuture.complete(0)
+      else:
+        f.offset.inc(res)
+        retFuture.complete(res)
+
+    if not cb(f.fd):
+      addRead(f.fd, cb)
+
+  return retFuture
+
+proc read*(f: AsyncFile, size: int): Future[string] =
+  ## Read `size` bytes from the specified file asynchronously starting at
+  ## the current position of the file pointer. `size` should be greater than zero.
+  ##
   ## If the file pointer is past the end of the file then an empty string is
   ## returned.
+  assert size > 0
   var retFuture = newFuture[string]("asyncfile.read")
 
   when defined(windows) or defined(nimdoc):
     var buffer = alloc0(size)
 
-    var ol = PCustomOverlapped()
-    GC_ref(ol)
+    var ol = newCustom()
     ol.data = CompletionData(fd: f.fd, cb:
-      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             assert bytesCount > 0
@@ -139,13 +228,13 @@ proc read*(f: AsyncFile, size: int): Future[string] =
             if errcode.int32 == ERROR_HANDLE_EOF:
               retFuture.complete("")
             else:
-              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+              retFuture.fail(newOSError(errcode))
         if buffer != nil:
           dealloc buffer
           buffer = nil
     )
-    ol.offset = DWord(f.offset and 0xffffffff)
-    ol.offsetHigh = DWord(f.offset shr 32)
+    ol.offset = DWORD(f.offset and 0xffffffff)
+    ol.offsetHigh = DWORD(f.offset shr 32)
 
     # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
     let ret = readFile(f.fd.Handle, buffer, size.int32, nil,
@@ -157,18 +246,23 @@ proc read*(f: AsyncFile, size: int): Future[string] =
           dealloc buffer
           buffer = nil
         GC_unref(ol)
-        retFuture.fail(newException(OSError, osErrorMsg(err)))
+
+        if err.int32 == ERROR_HANDLE_EOF:
+          # This happens in Windows Server 2003
+          retFuture.complete("")
+        else:
+          retFuture.fail(newOSError(err))
     else:
       # Request completed immediately.
-      var bytesRead: DWord
+      var bytesRead: DWORD
       let overlappedRes = getOverlappedResult(f.fd.Handle,
-          cast[POverlapped](ol)[], bytesRead, false.WinBool)
+          cast[POVERLAPPED](ol), bytesRead, false.WINBOOL)
       if not overlappedRes.bool:
         let err = osLastError()
         if err.int32 == ERROR_HANDLE_EOF:
           retFuture.complete("")
         else:
-          retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
+          retFuture.fail(newOSError(osLastError()))
       else:
         assert bytesRead > 0
         assert bytesRead <= size
@@ -185,11 +279,12 @@ proc read*(f: AsyncFile, size: int): Future[string] =
       if res < 0:
         let lastError = osLastError()
         if lastError.int32 != EAGAIN:
-          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+          retFuture.fail(newOSError(lastError))
         else:
           result = false # We still want this callback to be called.
       elif res == 0:
         # EOF
+        f.offset = lseek(fd.cint, 0, SEEK_CUR)
         retFuture.complete("")
       else:
         readBuffer.setLen(res)
@@ -206,6 +301,8 @@ proc readLine*(f: AsyncFile): Future[string] {.async.} =
   result = ""
   while true:
     var c = await read(f, 1)
+    if c.len == 0:
+      break
     if c[0] == '\c':
       c = await read(f, 1)
       break
@@ -225,7 +322,7 @@ proc setFilePos*(f: AsyncFile, pos: int64) =
   ## operations. The file's first byte has the index zero.
   f.offset = pos
   when not defined(windows) and not defined(nimdoc):
-    let ret = lseek(f.fd.cint, pos, SEEK_SET)
+    let ret = lseek(f.fd.cint, pos.Off, SEEK_SET)
     if ret == -1:
       raiseOSError(osLastError())
 
@@ -238,8 +335,77 @@ proc readAll*(f: AsyncFile): Future[string] {.async.} =
       return
     result.add data
 
+proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] =
+  ## Writes `size` bytes from `buf` to the file specified asynchronously.
+  ##
+  ## The returned Future will complete once all data has been written to the
+  ## specified file.
+  var retFuture = newFuture[void]("asyncfile.writeBuffer")
+  when defined(windows) or defined(nimdoc):
+    var ol = newCustom()
+    ol.data = CompletionData(fd: f.fd, cb:
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            assert bytesCount == size.int32
+            retFuture.complete()
+          else:
+            retFuture.fail(newOSError(errcode))
+    )
+    # passing -1 here should work according to MSDN, but doesn't. For more
+    # information see
+    # http://stackoverflow.com/questions/33650899/does-asynchronous-file-
+    #   appending-in-windows-preserve-order
+    ol.offset = DWORD(f.offset and 0xffffffff)
+    ol.offsetHigh = DWORD(f.offset shr 32)
+    f.offset.inc(size)
+
+    # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
+    let ret = writeFile(f.fd.Handle, buf, size.int32, nil,
+                       cast[POVERLAPPED](ol))
+    if not ret.bool:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        GC_unref(ol)
+        retFuture.fail(newOSError(err))
+    else:
+      # Request completed immediately.
+      var bytesWritten: DWORD
+      let overlappedRes = getOverlappedResult(f.fd.Handle,
+          cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL)
+      if not overlappedRes.bool:
+        retFuture.fail(newOSError(osLastError()))
+      else:
+        assert bytesWritten == size.int32
+        retFuture.complete()
+  else:
+    var written = 0
+
+    proc cb(fd: AsyncFD): bool =
+      result = true
+      let remainderSize = size - written
+      var cbuf = cast[cstring](buf)
+      let res = write(fd.cint, addr cbuf[written], remainderSize.cint)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 != EAGAIN:
+          retFuture.fail(newOSError(lastError))
+        else:
+          result = false # We still want this callback to be called.
+      else:
+        written.inc res
+        f.offset.inc res
+        if res != remainderSize:
+          result = false # We still have data to write.
+        else:
+          retFuture.complete()
+
+    if not cb(f.fd):
+      addWrite(f.fd, cb)
+  return retFuture
+
 proc write*(f: AsyncFile, data: string): Future[void] =
-  ## Writes ``data`` to the file specified asynchronously.
+  ## Writes `data` to the file specified asynchronously.
   ##
   ## The returned Future will complete once all data has been written to the
   ## specified file.
@@ -247,25 +413,24 @@ proc write*(f: AsyncFile, data: string): Future[void] =
   var copy = data
   when defined(windows) or defined(nimdoc):
     var buffer = alloc0(data.len)
-    copyMem(buffer, addr copy[0], data.len)
+    copyMem(buffer, copy.cstring, data.len)
 
-    var ol = PCustomOverlapped()
-    GC_ref(ol)
+    var ol = newCustom()
     ol.data = CompletionData(fd: f.fd, cb:
-      proc (fd: AsyncFD, bytesCount: DWord, errcode: OSErrorCode) =
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             assert bytesCount == data.len.int32
-            f.offset.inc(data.len)
             retFuture.complete()
           else:
-            retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+            retFuture.fail(newOSError(errcode))
         if buffer != nil:
           dealloc buffer
           buffer = nil
     )
-    ol.offset = DWord(f.offset and 0xffffffff)
-    ol.offsetHigh = DWord(f.offset shr 32)
+    ol.offset = DWORD(f.offset and 0xffffffff)
+    ol.offsetHigh = DWORD(f.offset shr 32)
+    f.offset.inc(data.len)
 
     # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
     let ret = writeFile(f.fd.Handle, buffer, data.len.int32, nil,
@@ -277,29 +442,35 @@ proc write*(f: AsyncFile, data: string): Future[void] =
           dealloc buffer
           buffer = nil
         GC_unref(ol)
-        retFuture.fail(newException(OSError, osErrorMsg(err)))
+        retFuture.fail(newOSError(err))
     else:
       # Request completed immediately.
-      var bytesWritten: DWord
+      var bytesWritten: DWORD
       let overlappedRes = getOverlappedResult(f.fd.Handle,
-          cast[POverlapped](ol)[], bytesWritten, false.WinBool)
+          cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL)
       if not overlappedRes.bool:
-        retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
+        retFuture.fail(newOSError(osLastError()))
       else:
         assert bytesWritten == data.len.int32
-        f.offset.inc(data.len)
         retFuture.complete()
   else:
     var written = 0
 
     proc cb(fd: AsyncFD): bool =
       result = true
-      let remainderSize = data.len-written
-      let res = write(fd.cint, addr copy[written], remainderSize.cint)
+
+      let remainderSize = data.len - written
+
+      let res =
+        if data.len == 0:
+          write(fd.cint, copy.cstring, 0)
+        else:
+          write(fd.cint, addr copy[written], remainderSize.cint)
+
       if res < 0:
         let lastError = osLastError()
         if lastError.int32 != EAGAIN:
-          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+          retFuture.fail(newOSError(lastError))
         else:
           result = false # We still want this callback to be called.
       else:
@@ -314,8 +485,26 @@ proc write*(f: AsyncFile, data: string): Future[void] =
       addWrite(f.fd, cb)
   return retFuture
 
+proc setFileSize*(f: AsyncFile, length: int64) =
+  ## Set a file length.
+  when defined(windows) or defined(nimdoc):
+    var
+      high = (length shr 32).DWORD
+    let
+      low = (length and 0xffffffff).DWORD
+      status = setFilePointer(f.fd.Handle, low, addr high, 0)
+      lastErr = osLastError()
+    if (status == INVALID_SET_FILE_POINTER and lastErr.int32 != NO_ERROR) or
+        (setEndOfFile(f.fd.Handle) == 0):
+      raiseOSError(osLastError())
+  else:
+    # will truncate if Off is a 32-bit type!
+    if ftruncate(f.fd.cint, length.Off) == -1:
+      raiseOSError(osLastError())
+
 proc close*(f: AsyncFile) =
   ## Closes the file specified.
+  unregister(f.fd)
   when defined(windows) or defined(nimdoc):
     if not closeHandle(f.fd.Handle).bool:
       raiseOSError(osLastError())
@@ -323,3 +512,26 @@ proc close*(f: AsyncFile) =
     if close(f.fd.cint) == -1:
       raiseOSError(osLastError())
 
+proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
+  ## Reads data from the specified future stream until it is completed.
+  ## The data which is read is written to the file immediately and
+  ## freed from memory.
+  ##
+  ## This procedure is perfect for saving streamed data to a file without
+  ## wasting memory.
+  while true:
+    let (hasValue, value) = await fs.read()
+    if hasValue:
+      await f.write(value)
+    else:
+      break
+
+proc readToStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
+  ## Writes data to the specified future stream as the file is read.
+  while true:
+    let data = await read(f, 4000)
+    if data.len == 0:
+      break
+    await fs.write(data)
+
+  fs.complete()