From e4c46e6fba65303c7046db28d996df17ec805e4f Mon Sep 17 00:00:00 2001 From: Anatoly Galiulin Date: Wed, 31 Aug 2016 11:18:36 +0700 Subject: Add async IO operations with buffers on files and sockets --- lib/pure/asyncfile.nim | 141 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) (limited to 'lib/pure/asyncfile.nim') diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim index 5df606ea8..8801c64d0 100644 --- a/lib/pure/asyncfile.nim +++ b/lib/pure/asyncfile.nim @@ -112,6 +112,80 @@ proc openAsync*(filename: string, mode = fmRead): AsyncFile = register(result.fd) +proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] = + ## Read ``size`` bytes from the specified file asynchronously starting at + ## the current position of the file pointer. + ## + ## If the file pointer is past the end of the file then an empty string is + ## returned. + var retFuture = newFuture[int]("asyncfile.readBuffer") + + when defined(windows) or defined(nimdoc): + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = CompletionData(fd: f.fd, cb: + proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount > 0 + assert bytesCount <= size + f.offset.inc bytesCount + retFuture.complete(bytesCount) + else: + if errcode.int32 == ERROR_HANDLE_EOF: + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + ol.offset = DWord(f.offset and 0xffffffff) + ol.offsetHigh = DWord(f.offset shr 32) + + # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead. + let ret = readFile(f.fd.Handle, buf, size.int32, nil, + cast[POVERLAPPED](ol)) + if not ret.bool: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + # Request completed immediately. + var bytesRead: DWord + let overlappedRes = getOverlappedResult(f.fd.Handle, + cast[POverlapped](ol), bytesRead, false.WinBool) + if not overlappedRes.bool: + let err = osLastError() + if err.int32 == ERROR_HANDLE_EOF: + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + else: + assert bytesRead > 0 + assert bytesRead <= size + f.offset.inc bytesRead + retFuture.complete(bytesRead) + else: + proc cb(fd: AsyncFD): bool = + result = true + let res = read(fd.cint, cast[cstring](buf), size.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EAGAIN: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + elif res == 0: + # EOF + retFuture.complete(0) + else: + f.offset.inc(res) + retFuture.complete(res) + + if not cb(f.fd): + addRead(f.fd, cb) + + return retFuture + proc read*(f: AsyncFile, size: int): Future[string] = ## Read ``size`` bytes from the specified file asynchronously starting at ## the current position of the file pointer. @@ -238,6 +312,73 @@ proc readAll*(f: AsyncFile): Future[string] {.async.} = return result.add data +proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] = + ## Writes ``size`` bytes from ``buf`` to the file specified asynchronously. + ## + ## The returned Future will complete once all data has been written to the + ## specified file. + var retFuture = newFuture[void]("asyncfile.writeBuffer") + when defined(windows) or defined(nimdoc): + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = CompletionData(fd: f.fd, cb: + proc (fd: AsyncFD, bytesCount: DWord, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount == size.int32 + f.offset.inc(size) + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + ol.offset = DWord(f.offset and 0xffffffff) + ol.offsetHigh = DWord(f.offset shr 32) + + # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten. + let ret = writeFile(f.fd.Handle, buf, size.int32, nil, + cast[POVERLAPPED](ol)) + if not ret.bool: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + # Request completed immediately. + var bytesWritten: DWord + let overlappedRes = getOverlappedResult(f.fd.Handle, + cast[POverlapped](ol), bytesWritten, false.WinBool) + if not overlappedRes.bool: + retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + else: + assert bytesWritten == size.int32 + f.offset.inc(size) + retFuture.complete() + else: + var written = 0 + + proc cb(fd: AsyncFD): bool = + result = true + let remainderSize = size-written + var cbuf = cast[cstring](buf) + let res = write(fd.cint, addr cbuf[written], remainderSize.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EAGAIN: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + written.inc res + f.offset.inc res + if res != remainderSize: + result = false # We still have data to write. + else: + retFuture.complete() + + if not cb(f.fd): + addWrite(f.fd, cb) + return retFuture + proc write*(f: AsyncFile, data: string): Future[void] = ## Writes ``data`` to the file specified asynchronously. ## -- cgit 1.4.1-2-gfad0