diff options
Diffstat (limited to 'lib/pure/asyncfile.nim')
-rw-r--r-- | lib/pure/asyncfile.nim | 354 |
1 files changed, 283 insertions, 71 deletions
diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim index c7b9fac18..0f6504342 100644 --- a/lib/pure/asyncfile.nim +++ b/lib/pure/asyncfile.nim @@ -9,29 +9,37 @@ ## This module implements asynchronous file reading and writing. ## -## .. code-block:: Nim -## import asyncfile, asyncdispatch, os +## ```Nim +## import std/[asyncfile, asyncdispatch, os] ## -## proc main() {.async.} = -## var file = openAsync(getTempDir() / "foobar.txt", fmReadWrite) -## await file.write("test") -## file.setFilePos(0) -## let data = await file.readAll() -## doAssert data == "test" -## file.close() +## proc main() {.async.} = +## var file = openAsync(getTempDir() / "foobar.txt", fmReadWrite) +## await file.write("test") +## file.setFilePos(0) +## let data = await file.readAll() +## doAssert data == "test" +## file.close() ## -## waitFor main() +## waitFor main() +## ``` -import asyncdispatch, os +import std/[asyncdispatch, os] + +when defined(nimPreviewSlimSystem): + import std/[assertions, syncio] + when defined(windows) or defined(nimdoc): + import std/widestrs + +# TODO: Fix duplication introduced by PR #4683. when defined(windows) or defined(nimdoc): - import winlean + import std/winlean else: - import posix + import std/posix type AsyncFile* = ref object - fd: AsyncFd + fd: AsyncFD offset: int64 when defined(windows) or defined(nimdoc): @@ -48,56 +56,60 @@ when defined(windows) or defined(nimdoc): case mode of fmRead, fmReadWriteExisting: OPEN_EXISTING - of fmAppend, fmReadWrite, fmWrite: - if fileExists(filename): - OPEN_EXISTING - else: - CREATE_NEW + of fmReadWrite, fmWrite: + CREATE_ALWAYS + of fmAppend: + OPEN_ALWAYS else: proc getPosixFlags(mode: FileMode): cint = case mode of fmRead: result = O_RDONLY of fmWrite: - result = O_WRONLY or O_CREAT + result = O_WRONLY or O_CREAT or O_TRUNC of fmAppend: result = O_WRONLY or O_CREAT or O_APPEND of fmReadWrite: - result = O_RDWR or O_CREAT + result = O_RDWR or O_CREAT or O_TRUNC of fmReadWriteExisting: result = O_RDWR result = result or O_NONBLOCK -proc getFileSize(f: AsyncFile): int64 = +proc getFileSize*(f: AsyncFile): int64 = ## Retrieves the specified file's size. when defined(windows) or defined(nimdoc): - var high: DWord + var high: DWORD let low = getFileSize(f.fd.Handle, addr high) if low == INVALID_FILE_SIZE: raiseOSError(osLastError()) - return (high shl 32) or low + result = (high shl 32) or low + else: + let curPos = lseek(f.fd.cint, 0, SEEK_CUR) + result = lseek(f.fd.cint, 0, SEEK_END) + f.offset = lseek(f.fd.cint, curPos, SEEK_SET) + assert(f.offset == curPos) -proc openAsync*(filename: string, mode = fmRead): AsyncFile = - ## Opens a file specified by the path in ``filename`` using - ## the specified ``mode`` asynchronously. +proc newAsyncFile*(fd: AsyncFD): AsyncFile = + ## Creates `AsyncFile` with a previously opened file descriptor `fd`. new result + result.fd = fd + register(fd) + +proc openAsync*(filename: string, mode = fmRead): AsyncFile = + ## Opens a file specified by the path in `filename` using + ## the specified FileMode `mode` asynchronously. when defined(windows) or defined(nimdoc): let flags = FILE_FLAG_OVERLAPPED or FILE_ATTRIBUTE_NORMAL let desiredAccess = getDesiredAccess(mode) let creationDisposition = getCreationDisposition(mode, filename) - when useWinUnicode: - result.fd = createFileW(newWideCString(filename), desiredAccess, - FILE_SHARE_READ, - nil, creationDisposition, flags, 0).AsyncFd - else: - result.fd = createFileA(filename, desiredAccess, - FILE_SHARE_READ, - nil, creationDisposition, flags, 0).AsyncFd + let fd = createFileW(newWideCString(filename), desiredAccess, + FILE_SHARE_READ, + nil, creationDisposition, flags, 0) - if result.fd.Handle == INVALID_HANDLE_VALUE: + if fd == INVALID_HANDLE_VALUE: raiseOSError(osLastError()) - register(result.fd) + result = newAsyncFile(fd.AsyncFD) if mode == fmAppend: result.offset = getFileSize(result) @@ -106,27 +118,104 @@ proc openAsync*(filename: string, mode = fmRead): AsyncFile = let flags = getPosixFlags(mode) # RW (Owner), RW (Group), R (Other) let perm = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH - result.fd = open(filename, flags, perm).AsyncFD - if result.fd.cint == -1: + let fd = open(filename, flags, perm) + if fd == -1: raiseOSError(osLastError()) - register(result.fd) + result = newAsyncFile(fd.AsyncFD) -proc read*(f: AsyncFile, size: int): Future[string] = - ## Read ``size`` bytes from the specified file asynchronously starting at +proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] = + ## Read `size` bytes from the specified file asynchronously starting at ## the current position of the file pointer. ## + ## If the file pointer is past the end of the file then zero is returned + ## and no bytes are read into `buf` + var retFuture = newFuture[int]("asyncfile.readBuffer") + + when defined(windows) or defined(nimdoc): + var ol = newCustom() + ol.data = CompletionData(fd: f.fd, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount > 0 + assert bytesCount <= size + f.offset.inc bytesCount + retFuture.complete(bytesCount) + else: + if errcode.int32 == ERROR_HANDLE_EOF: + retFuture.complete(0) + else: + retFuture.fail(newOSError(errcode)) + ) + ol.offset = DWORD(f.offset and 0xffffffff) + ol.offsetHigh = DWORD(f.offset shr 32) + + # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead. + let ret = readFile(f.fd.Handle, buf, size.int32, nil, + cast[POVERLAPPED](ol)) + if not ret.bool: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + if err.int32 == ERROR_HANDLE_EOF: + # This happens in Windows Server 2003 + retFuture.complete(0) + else: + retFuture.fail(newOSError(err)) + else: + # Request completed immediately. + var bytesRead: DWORD + let overlappedRes = getOverlappedResult(f.fd.Handle, + cast[POVERLAPPED](ol), bytesRead, false.WINBOOL) + if not overlappedRes.bool: + let err = osLastError() + if err.int32 == ERROR_HANDLE_EOF: + retFuture.complete(0) + else: + retFuture.fail(newOSError(osLastError())) + else: + assert bytesRead > 0 + assert bytesRead <= size + f.offset.inc bytesRead + retFuture.complete(bytesRead) + else: + proc cb(fd: AsyncFD): bool = + result = true + let res = read(fd.cint, cast[cstring](buf), size.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EAGAIN: + retFuture.fail(newOSError(lastError)) + else: + result = false # We still want this callback to be called. + elif res == 0: + # EOF + retFuture.complete(0) + else: + f.offset.inc(res) + retFuture.complete(res) + + if not cb(f.fd): + addRead(f.fd, cb) + + return retFuture + +proc read*(f: AsyncFile, size: int): Future[string] = + ## Read `size` bytes from the specified file asynchronously starting at + ## the current position of the file pointer. `size` should be greater than zero. + ## ## If the file pointer is past the end of the file then an empty string is ## returned. + assert size > 0 var retFuture = newFuture[string]("asyncfile.read") when defined(windows) or defined(nimdoc): var buffer = alloc0(size) - var ol = PCustomOverlapped() - GC_ref(ol) + var ol = newCustom() ol.data = CompletionData(fd: f.fd, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): assert bytesCount > 0 @@ -139,13 +228,13 @@ proc read*(f: AsyncFile, size: int): Future[string] = if errcode.int32 == ERROR_HANDLE_EOF: retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) if buffer != nil: dealloc buffer buffer = nil ) - ol.offset = DWord(f.offset and 0xffffffff) - ol.offsetHigh = DWord(f.offset shr 32) + ol.offset = DWORD(f.offset and 0xffffffff) + ol.offsetHigh = DWORD(f.offset shr 32) # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead. let ret = readFile(f.fd.Handle, buffer, size.int32, nil, @@ -157,18 +246,23 @@ proc read*(f: AsyncFile, size: int): Future[string] = dealloc buffer buffer = nil GC_unref(ol) - retFuture.fail(newException(OSError, osErrorMsg(err))) + + if err.int32 == ERROR_HANDLE_EOF: + # This happens in Windows Server 2003 + retFuture.complete("") + else: + retFuture.fail(newOSError(err)) else: # Request completed immediately. - var bytesRead: DWord + var bytesRead: DWORD let overlappedRes = getOverlappedResult(f.fd.Handle, - cast[POverlapped](ol)[], bytesRead, false.WinBool) + cast[POVERLAPPED](ol), bytesRead, false.WINBOOL) if not overlappedRes.bool: let err = osLastError() if err.int32 == ERROR_HANDLE_EOF: retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + retFuture.fail(newOSError(osLastError())) else: assert bytesRead > 0 assert bytesRead <= size @@ -185,11 +279,12 @@ proc read*(f: AsyncFile, size: int): Future[string] = if res < 0: let lastError = osLastError() if lastError.int32 != EAGAIN: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. elif res == 0: # EOF + f.offset = lseek(fd.cint, 0, SEEK_CUR) retFuture.complete("") else: readBuffer.setLen(res) @@ -206,6 +301,8 @@ proc readLine*(f: AsyncFile): Future[string] {.async.} = result = "" while true: var c = await read(f, 1) + if c.len == 0: + break if c[0] == '\c': c = await read(f, 1) break @@ -225,7 +322,7 @@ proc setFilePos*(f: AsyncFile, pos: int64) = ## operations. The file's first byte has the index zero. f.offset = pos when not defined(windows) and not defined(nimdoc): - let ret = lseek(f.fd.cint, pos, SEEK_SET) + let ret = lseek(f.fd.cint, pos.Off, SEEK_SET) if ret == -1: raiseOSError(osLastError()) @@ -238,8 +335,77 @@ proc readAll*(f: AsyncFile): Future[string] {.async.} = return result.add data +proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] = + ## Writes `size` bytes from `buf` to the file specified asynchronously. + ## + ## The returned Future will complete once all data has been written to the + ## specified file. + var retFuture = newFuture[void]("asyncfile.writeBuffer") + when defined(windows) or defined(nimdoc): + var ol = newCustom() + ol.data = CompletionData(fd: f.fd, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount == size.int32 + retFuture.complete() + else: + retFuture.fail(newOSError(errcode)) + ) + # passing -1 here should work according to MSDN, but doesn't. For more + # information see + # http://stackoverflow.com/questions/33650899/does-asynchronous-file- + # appending-in-windows-preserve-order + ol.offset = DWORD(f.offset and 0xffffffff) + ol.offsetHigh = DWORD(f.offset shr 32) + f.offset.inc(size) + + # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten. + let ret = writeFile(f.fd.Handle, buf, size.int32, nil, + cast[POVERLAPPED](ol)) + if not ret.bool: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newOSError(err)) + else: + # Request completed immediately. + var bytesWritten: DWORD + let overlappedRes = getOverlappedResult(f.fd.Handle, + cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL) + if not overlappedRes.bool: + retFuture.fail(newOSError(osLastError())) + else: + assert bytesWritten == size.int32 + retFuture.complete() + else: + var written = 0 + + proc cb(fd: AsyncFD): bool = + result = true + let remainderSize = size - written + var cbuf = cast[cstring](buf) + let res = write(fd.cint, addr cbuf[written], remainderSize.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EAGAIN: + retFuture.fail(newOSError(lastError)) + else: + result = false # We still want this callback to be called. + else: + written.inc res + f.offset.inc res + if res != remainderSize: + result = false # We still have data to write. + else: + retFuture.complete() + + if not cb(f.fd): + addWrite(f.fd, cb) + return retFuture + proc write*(f: AsyncFile, data: string): Future[void] = - ## Writes ``data`` to the file specified asynchronously. + ## Writes `data` to the file specified asynchronously. ## ## The returned Future will complete once all data has been written to the ## specified file. @@ -247,25 +413,24 @@ proc write*(f: AsyncFile, data: string): Future[void] = var copy = data when defined(windows) or defined(nimdoc): var buffer = alloc0(data.len) - copyMem(buffer, addr copy[0], data.len) + copyMem(buffer, copy.cstring, data.len) - var ol = PCustomOverlapped() - GC_ref(ol) + var ol = newCustom() ol.data = CompletionData(fd: f.fd, cb: - proc (fd: AsyncFD, bytesCount: DWord, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): assert bytesCount == data.len.int32 - f.offset.inc(data.len) retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) if buffer != nil: dealloc buffer buffer = nil ) - ol.offset = DWord(f.offset and 0xffffffff) - ol.offsetHigh = DWord(f.offset shr 32) + ol.offset = DWORD(f.offset and 0xffffffff) + ol.offsetHigh = DWORD(f.offset shr 32) + f.offset.inc(data.len) # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten. let ret = writeFile(f.fd.Handle, buffer, data.len.int32, nil, @@ -277,29 +442,35 @@ proc write*(f: AsyncFile, data: string): Future[void] = dealloc buffer buffer = nil GC_unref(ol) - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) else: # Request completed immediately. - var bytesWritten: DWord + var bytesWritten: DWORD let overlappedRes = getOverlappedResult(f.fd.Handle, - cast[POverlapped](ol)[], bytesWritten, false.WinBool) + cast[POVERLAPPED](ol), bytesWritten, false.WINBOOL) if not overlappedRes.bool: - retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + retFuture.fail(newOSError(osLastError())) else: assert bytesWritten == data.len.int32 - f.offset.inc(data.len) retFuture.complete() else: var written = 0 proc cb(fd: AsyncFD): bool = result = true - let remainderSize = data.len-written - let res = write(fd.cint, addr copy[written], remainderSize.cint) + + let remainderSize = data.len - written + + let res = + if data.len == 0: + write(fd.cint, copy.cstring, 0) + else: + write(fd.cint, addr copy[written], remainderSize.cint) + if res < 0: let lastError = osLastError() if lastError.int32 != EAGAIN: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. else: @@ -314,8 +485,26 @@ proc write*(f: AsyncFile, data: string): Future[void] = addWrite(f.fd, cb) return retFuture +proc setFileSize*(f: AsyncFile, length: int64) = + ## Set a file length. + when defined(windows) or defined(nimdoc): + var + high = (length shr 32).DWORD + let + low = (length and 0xffffffff).DWORD + status = setFilePointer(f.fd.Handle, low, addr high, 0) + lastErr = osLastError() + if (status == INVALID_SET_FILE_POINTER and lastErr.int32 != NO_ERROR) or + (setEndOfFile(f.fd.Handle) == 0): + raiseOSError(osLastError()) + else: + # will truncate if Off is a 32-bit type! + if ftruncate(f.fd.cint, length.Off) == -1: + raiseOSError(osLastError()) + proc close*(f: AsyncFile) = ## Closes the file specified. + unregister(f.fd) when defined(windows) or defined(nimdoc): if not closeHandle(f.fd.Handle).bool: raiseOSError(osLastError()) @@ -323,3 +512,26 @@ proc close*(f: AsyncFile) = if close(f.fd.cint) == -1: raiseOSError(osLastError()) +proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} = + ## Reads data from the specified future stream until it is completed. + ## The data which is read is written to the file immediately and + ## freed from memory. + ## + ## This procedure is perfect for saving streamed data to a file without + ## wasting memory. + while true: + let (hasValue, value) = await fs.read() + if hasValue: + await f.write(value) + else: + break + +proc readToStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} = + ## Writes data to the specified future stream as the file is read. + while true: + let data = await read(f, 4000) + if data.len == 0: + break + await fs.write(data) + + fs.complete() |