diff options
author | Araq <rumpf_a@web.de> | 2014-09-06 00:54:28 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2014-09-06 00:54:28 +0200 |
commit | ef001573df9405dff94a763fc3dac6f3e1943738 (patch) | |
tree | 3b7c7b8ed75e8dc84c778c9ebf5fd400c4417aa6 /lib | |
parent | 7f7b13a45f73c6d9dcca3ce8388833189d77426c (diff) | |
parent | 52c16a1a79063d5dfe03cc3ecbcc6685fc15f8e7 (diff) | |
download | Nim-ef001573df9405dff94a763fc3dac6f3e1943738.tar.gz |
Merge branch 'bigbreak' of https://github.com/Araq/Nimrod into bigbreak
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 24 | ||||
-rw-r--r-- | lib/pure/asyncfile.nim | 312 | ||||
-rw-r--r-- | lib/windows/winlean.nim | 24 |
3 files changed, 340 insertions, 20 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 14b56d1ed..052de6f3a 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -143,6 +143,7 @@ proc echoOriginalStackTrace[T](future: Future[T]) = echo(future.errorStackTrace) else: echo("Empty or nil stack trace.") + echo("Continuing...") proc read*[T](future: Future[T]): T = ## Retrieves the value of ``future``. Future must be finished otherwise @@ -226,8 +227,8 @@ when defined(windows) or defined(nimdoc): TCompletionKey = Dword TCompletionData* = object - sock: TAsyncFD - cb: proc (sock: TAsyncFD, bytesTransferred: Dword, + sock*: TAsyncFD # TODO: Rename this. + cb*: proc (sock: TAsyncFD, bytesTransferred: Dword, errcode: OSErrorCode) {.closure,gcsafe.} PDispatcher* = ref object of PDispatcherBase @@ -237,7 +238,7 @@ when defined(windows) or defined(nimdoc): TCustomOverlapped = object of TOVERLAPPED data*: TCompletionData - PCustomOverlapped = ref TCustomOverlapped + PCustomOverlapped* = ref TCustomOverlapped TAsyncFD* = distinct int @@ -247,7 +248,7 @@ when defined(windows) or defined(nimdoc): proc newDispatcher*(): PDispatcher = ## Creates a new Dispatcher instance. new result - result.ioPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) + result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[TAsyncFD]() result.timers = @[] @@ -260,7 +261,7 @@ when defined(windows) or defined(nimdoc): proc register*(sock: TAsyncFD) = ## Registers ``sock`` with the dispatcher. let p = getGlobalDispatcher() - if CreateIoCompletionPort(sock.THandle, p.ioPort, + if createIoCompletionPort(sock.THandle, p.ioPort, cast[TCompletionKey](sock), 1) == 0: raiseOSError(osLastError()) p.handles.incl(sock) @@ -286,7 +287,7 @@ when defined(windows) or defined(nimdoc): var lpNumberOfBytesTransferred: Dword var lpCompletionKey: ULONG var customOverlapped: PCustomOverlapped - let res = GetQueuedCompletionStatus(p.ioPort, + let res = getQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred, addr lpCompletionKey, cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool @@ -723,7 +724,7 @@ else: assert sock.SocketHandle in p.selector discard p.selector.update(sock.SocketHandle, events) - proc register(sock: TAsyncFD) = + proc register*(sock: TAsyncFD) = let p = getGlobalDispatcher() var data = PData(sock: sock, readCBs: @[], writeCBs: @[]) p.selector.register(sock.SocketHandle, {}, data.PObject) @@ -743,14 +744,14 @@ else: proc unregister*(fd: TAsyncFD) = getGlobalDispatcher().selector.unregister(fd.SocketHandle) - proc addRead(sock: TAsyncFD, cb: TCallback) = + proc addRead*(sock: TAsyncFD, cb: TCallback) = let p = getGlobalDispatcher() if sock.SocketHandle notin p.selector: raise newException(EInvalidValue, "File descriptor not registered.") p.selector[sock.SocketHandle].data.PData.readCBs.add(cb) update(sock, p.selector[sock.SocketHandle].events + {EvRead}) - proc addWrite(sock: TAsyncFD, cb: TCallback) = + proc addWrite*(sock: TAsyncFD, cb: TCallback) = let p = getGlobalDispatcher() if sock.SocketHandle notin p.selector: raise newException(EInvalidValue, "File descriptor not registered.") @@ -1231,10 +1232,9 @@ proc runForever*() = while true: poll() -proc waitFor*[T](fut: PFuture[T]) = +proc waitFor*[T](fut: PFuture[T]): T = ## **Blocks** the current thread until the specified future completes. while not fut.finished: poll() - if fut.failed: - raise fut.error + fut.read diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim new file mode 100644 index 000000000..009485ed9 --- /dev/null +++ b/lib/pure/asyncfile.nim @@ -0,0 +1,312 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2014 Dominik Picheta +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module implements asynchronous file handling. +## +## .. code-block:: Nim +## import asyncfile, asyncdispatch, os +## +## proc main() {.async.} = +## var file = openAsync(getTempDir() / "foobar.txt", fmReadWrite) +## await file.write("test") +## file.setFilePos(0) +## let data = await file.readAll() +## doAssert data == "test" +## file.close() +## +## waitFor main() + +import asyncdispatch, os + +when defined(windows): + import winlean +else: + import posix + +type + AsyncFile = ref object + fd: TAsyncFd + offset: int64 + +when defined(windows): + proc getDesiredAccess(mode: TFileMode): int32 = + case mode + of fmRead: + result = GENERIC_READ + of fmWrite, fmAppend: + result = GENERIC_WRITE + of fmReadWrite, fmReadWriteExisting: + result = GENERIC_READ or GENERIC_WRITE + + proc getCreationDisposition(mode: TFileMode, filename: string): int32 = + case mode + of fmRead, fmReadWriteExisting: + OPEN_EXISTING + of fmAppend, fmReadWrite, fmWrite: + if fileExists(filename): + OPEN_EXISTING + else: + CREATE_NEW +else: + proc getPosixFlags(mode: TFileMode): cint = + case mode + of fmRead: + result = O_RDONLY + of fmWrite: + result = O_WRONLY or O_CREAT + of fmAppend: + result = O_WRONLY or O_CREAT or O_APPEND + of fmReadWrite: + result = O_RDWR or O_CREAT + of fmReadWriteExisting: + result = O_RDWR + result = result or O_NONBLOCK + +proc getFileSize*(f: AsyncFile): int64 = + ## Retrieves the specified file's size. + when defined(windows): + var high: DWord + let low = getFileSize(f.fd.THandle, addr high) + if low == INVALID_FILE_SIZE: + raiseOSError() + return (high shl 32) or low + +proc openAsync*(filename: string, mode = fmRead): AsyncFile = + ## Opens a file specified by the path in ``filename`` using + ## the specified ``mode`` asynchronously. + new result + when defined(windows): + let flags = FILE_FLAG_OVERLAPPED or FILE_ATTRIBUTE_NORMAL + let desiredAccess = getDesiredAccess(mode) + let creationDisposition = getCreationDisposition(mode, filename) + when useWinUnicode: + result.fd = createFileW(newWideCString(filename), desiredAccess, + FILE_SHARE_READ, + nil, creationDisposition, flags, 0).TAsyncFd + else: + result.fd = createFileA(filename, desiredAccess, + FILE_SHARE_READ, + nil, creationDisposition, flags, 0).TAsyncFd + + if result.fd.THandle == INVALID_HANDLE_VALUE: + raiseOSError() + + register(result.fd) + + if mode == fmAppend: + result.offset = getFileSize(result) + + else: + let flags = getPosixFlags(mode) + # RW (Owner), RW (Group), R (Other) + let perm = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH + result.fd = open(filename, flags, perm).TAsyncFD + if result.fd.cint == -1: + raiseOSError() + + register(result.fd) + +proc read*(f: AsyncFile, size: int): Future[string] = + ## Read ``size`` bytes from the specified file asynchronously starting at + ## the current position of the file pointer. + ## + ## If the file pointer is past the end of the file then an empty string is + ## returned. + var retFuture = newFuture[string]("asyncfile.read") + + when defined(windows): + var buffer = alloc0(size) + + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = TCompletionData(sock: f.fd, cb: + proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount > 0 + assert bytesCount <= size + var data = newString(bytesCount) + copyMem(addr data[0], buffer, bytesCount) + f.offset.inc bytesCount + retFuture.complete($data) + else: + if errcode.int32 == ERROR_HANDLE_EOF: + retFuture.complete("") + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + if buffer != nil: + dealloc buffer + buffer = nil + ) + ol.offset = DWord(f.offset and 0xffffffff) + ol.offsetHigh = DWord(f.offset shr 32) + + # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead. + let ret = readFile(f.fd.THandle, buffer, size.int32, nil, + cast[POVERLAPPED](ol)) + if not ret.bool: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + if buffer != nil: + dealloc buffer + buffer = nil + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + # Request completed immediately. + var bytesRead: DWord + let overlappedRes = getOverlappedResult(f.fd.THandle, + cast[POverlapped](ol)[], bytesRead, false.WinBool) + if not overlappedRes.bool: + let err = osLastError() + if err.int32 == ERROR_HANDLE_EOF: + retFuture.complete("") + else: + retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + else: + assert bytesRead > 0 + assert bytesRead <= size + var data = newString(bytesRead) + copyMem(addr data[0], buffer, bytesRead) + f.offset.inc bytesRead + retFuture.complete($data) + else: + var readBuffer = newString(size) + + proc cb(fd: TAsyncFD): bool = + result = true + let res = read(fd.cint, addr readBuffer[0], size.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EAGAIN: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + elif res == 0: + # EOF + retFuture.complete("") + else: + readBuffer.setLen(res) + f.offset.inc(res) + retFuture.complete(readBuffer) + + if not cb(f.fd): + addRead(f.fd, cb) + + return retFuture + +proc getFilePos*(f: AsyncFile): int64 = + ## Retrieves the current position of the file pointer that is + ## used to read from the specified file. The file's first byte has the + ## index zero. + f.offset + +proc setFilePos*(f: AsyncFile, pos: int64) = + ## Sets the position of the file pointer that is used for read/write + ## operations. The file's first byte has the index zero. + f.offset = pos + when not defined(windows): + let ret = lseek(f.fd.cint, pos, SEEK_SET) + if ret == -1: + raiseOSError() + +proc readAll*(f: AsyncFile): Future[string] {.async.} = + ## Reads all data from the specified file. + result = "" + while true: + let data = await read(f, 4000) + if data.len == 0: + return + result.add data + +proc write*(f: AsyncFile, data: string): Future[void] = + ## Writes ``data`` to the file specified asynchronously. + ## + ## The returned Future will complete once all data has been written to the + ## specified file. + var retFuture = newFuture[void]("asyncfile.write") + var copy = data + when defined(windows): + var buffer = alloc0(data.len) + copyMem(buffer, addr copy[0], data.len) + + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = TCompletionData(sock: f.fd, cb: + proc (fd: TAsyncFD, bytesCount: DWord, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount == data.len.int32 + f.offset.inc(data.len) + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + if buffer != nil: + dealloc buffer + buffer = nil + ) + ol.offset = DWord(f.offset and 0xffffffff) + ol.offsetHigh = DWord(f.offset shr 32) + + # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten. + let ret = writeFile(f.fd.THandle, buffer, data.len.int32, nil, + cast[POVERLAPPED](ol)) + if not ret.bool: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + if buffer != nil: + dealloc buffer + buffer = nil + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + # Request completed immediately. + var bytesWritten: DWord + let overlappedRes = getOverlappedResult(f.fd.THandle, + cast[POverlapped](ol)[], bytesWritten, false.WinBool) + if not overlappedRes.bool: + retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + else: + assert bytesWritten == data.len.int32 + f.offset.inc(data.len) + retFuture.complete() + else: + var written = 0 + + proc cb(fd: TAsyncFD): bool = + result = true + let remainderSize = data.len-written + let res = write(fd.cint, addr copy[written], remainderSize.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EAGAIN: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + written.inc res + f.offset.inc res + if res != remainderSize: + result = false # We still have data to write. + else: + retFuture.complete() + + if not cb(f.fd): + addWrite(f.fd, cb) + return retFuture + +proc close*(f: AsyncFile) = + ## Closes the file specified. + when defined(windows): + if not closeHandle(f.fd.THandle).bool: + raiseOSError() + else: + if close(f.fd.cint) == -1: + raiseOSError() + diff --git a/lib/windows/winlean.nim b/lib/windows/winlean.nim index ecf80fcd8..014215066 100644 --- a/lib/windows/winlean.nim +++ b/lib/windows/winlean.nim @@ -104,11 +104,11 @@ proc closeHandle*(hObject: THandle): WINBOOL {.stdcall, dynlib: "kernel32", importc: "CloseHandle".} proc readFile*(hFile: THandle, Buffer: pointer, nNumberOfBytesToRead: int32, - lpNumberOfBytesRead: var int32, lpOverlapped: pointer): WINBOOL{. + lpNumberOfBytesRead: ptr int32, lpOverlapped: pointer): WINBOOL{. stdcall, dynlib: "kernel32", importc: "ReadFile".} proc writeFile*(hFile: THandle, Buffer: pointer, nNumberOfBytesToWrite: int32, - lpNumberOfBytesWritten: var int32, + lpNumberOfBytesWritten: ptr int32, lpOverlapped: pointer): WINBOOL{. stdcall, dynlib: "kernel32", importc: "WriteFile".} @@ -573,12 +573,14 @@ proc waitForMultipleObjects*(nCount: DWORD, lpHandles: PWOHandleArray, const GENERIC_READ* = 0x80000000'i32 + GENERIC_WRITE* = 0x40000000'i32 GENERIC_ALL* = 0x10000000'i32 FILE_SHARE_READ* = 1'i32 FILE_SHARE_DELETE* = 4'i32 FILE_SHARE_WRITE* = 2'i32 CREATE_ALWAYS* = 2'i32 + CREATE_NEW* = 1'i32 OPEN_EXISTING* = 3'i32 FILE_BEGIN* = 0'i32 INVALID_SET_FILE_POINTER* = -1'i32 @@ -595,6 +597,7 @@ const # Error Constants const ERROR_ACCESS_DENIED* = 5 + ERROR_HANDLE_EOF* = 38 when useWinUnicode: proc createFileW*(lpFileName: WideCString, dwDesiredAccess, dwShareMode: DWORD, @@ -649,10 +652,10 @@ proc unmapViewOfFile*(lpBaseAddress: pointer): WINBOOL {.stdcall, type TOVERLAPPED* {.pure, inheritable.} = object - Internal*: PULONG - InternalHigh*: PULONG - Offset*: DWORD - OffsetHigh*: DWORD + internal*: PULONG + internalHigh*: PULONG + offset*: DWORD + offsetHigh*: DWORD hEvent*: THandle POVERLAPPED* = ptr TOVERLAPPED @@ -668,6 +671,7 @@ type const ERROR_IO_PENDING* = 997 # a.k.a WSA_IO_PENDING + FILE_FLAG_OVERLAPPED* = 1073741824 WSAECONNABORTED* = 10053 WSAECONNRESET* = 10054 WSAEDISCON* = 10101 @@ -675,17 +679,21 @@ const WSAETIMEDOUT* = 10060 ERROR_NETNAME_DELETED* = 64 -proc CreateIoCompletionPort*(FileHandle: THandle, ExistingCompletionPort: THandle, +proc createIoCompletionPort*(FileHandle: THandle, ExistingCompletionPort: THandle, CompletionKey: DWORD, NumberOfConcurrentThreads: DWORD): THandle{.stdcall, dynlib: "kernel32", importc: "CreateIoCompletionPort".} -proc GetQueuedCompletionStatus*(CompletionPort: THandle, +proc getQueuedCompletionStatus*(CompletionPort: THandle, lpNumberOfBytesTransferred: PDWORD, lpCompletionKey: PULONG, lpOverlapped: ptr POVERLAPPED, dwMilliseconds: DWORD): WINBOOL{.stdcall, dynlib: "kernel32", importc: "GetQueuedCompletionStatus".} +proc getOverlappedResult*(hFile: THandle, lpOverlapped: TOverlapped, + lpNumberOfBytesTransferred: var DWORD, bWait: WINBOOL): WINBOOL{. + stdcall, dynlib: "kernel32", importc: "GetOverlappedResult".} + const IOC_OUT* = 0x40000000 IOC_IN* = 0x80000000 |