From 192e11e7b72470e27bc6bccf1fedbfefc9c4ebd8 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sat, 22 Mar 2014 22:33:02 +0000 Subject: Many renames. Created high level asyncnet module. --- lib/pure/asyncdispatch.nim | 1013 ++++++++++++++++++++++++++++++++++++++++++++ lib/pure/asyncio2.nim | 997 ------------------------------------------- lib/pure/asyncnet.nim | 147 +++++++ lib/pure/net.nim | 4 +- lib/pure/rawsockets.nim | 420 ++++++++++++++++++ lib/pure/sockets2.nim | 420 ------------------ 6 files changed, 1582 insertions(+), 1419 deletions(-) create mode 100644 lib/pure/asyncdispatch.nim delete mode 100644 lib/pure/asyncio2.nim create mode 100644 lib/pure/asyncnet.nim create mode 100644 lib/pure/rawsockets.nim delete mode 100644 lib/pure/sockets2.nim (limited to 'lib/pure') diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim new file mode 100644 index 000000000..67361e46c --- /dev/null +++ b/lib/pure/asyncdispatch.nim @@ -0,0 +1,1013 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2014 Dominik Picheta +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +import os, oids, tables, strutils, macros + +import rawsockets + +## AsyncDispatch +## -------- +## +## This module implements a brand new dispatcher based on Futures. +## On Windows IOCP is used and on other operating systems the selectors module +## is used instead. + +# -- Futures + +type + PFutureBase* = ref object of PObject + cb: proc () {.closure.} + finished: bool + + PFuture*[T] = ref object of PFutureBase + value: T + error*: ref EBase # TODO: This shouldn't be necessary, generics bug? + +proc newFuture*[T](): PFuture[T] = + ## Creates a new future. + new(result) + result.finished = false + +proc complete*[T](future: PFuture[T], val: T) = + ## Completes ``future`` with value ``val``. + assert(not future.finished, "Future already finished, cannot finish twice.") + assert(future.error == nil) + future.value = val + future.finished = true + if future.cb != nil: + future.cb() + +proc complete*(future: PFuture[void]) = + ## Completes a void ``future``. + assert(not future.finished, "Future already finished, cannot finish twice.") + assert(future.error == nil) + future.finished = true + if future.cb != nil: + future.cb() + +proc fail*[T](future: PFuture[T], error: ref EBase) = + ## Completes ``future`` with ``error``. + assert(not future.finished, "Future already finished, cannot finish twice.") + future.finished = true + future.error = error + if future.cb != nil: + future.cb() + +proc `callback=`*(future: PFutureBase, cb: proc () {.closure.}) = + ## Sets the callback proc to be called when the future completes. + ## + ## If future has already completed then ``cb`` will be called immediately. + ## + ## **Note**: You most likely want the other ``callback`` setter which + ## passes ``future`` as a param to the callback. + future.cb = cb + if future.finished: + future.cb() + +proc `callback=`*[T](future: PFuture[T], + cb: proc (future: PFuture[T]) {.closure.}) = + ## Sets the callback proc to be called when the future completes. + ## + ## If future has already completed then ``cb`` will be called immediately. + future.callback = proc () = cb(future) + +proc read*[T](future: PFuture[T]): T = + ## Retrieves the value of ``future``. Future must be finished otherwise + ## this function will fail with a ``EInvalidValue`` exception. + ## + ## If the result of the future is an error then that error will be raised. + if future.finished: + if future.error != nil: raise future.error + when T isnot void: + return future.value + else: + # TODO: Make a custom exception type for this? + raise newException(EInvalidValue, "Future still in progress.") + +proc readError*[T](future: PFuture[T]): ref EBase = + if future.error != nil: return future.error + else: + raise newException(EInvalidValue, "No error in future.") + +proc finished*[T](future: PFuture[T]): bool = + ## Determines whether ``future`` has completed. + ## + ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. + future.finished + +proc failed*[T](future: PFuture[T]): bool = + ## Determines whether ``future`` completed with an error. + future.error != nil + +when defined(windows) or defined(nimdoc): + import winlean, sets, hashes + type + TCompletionKey = dword + + TCompletionData* = object + sock: TSocketHandle + cb: proc (sock: TSocketHandle, bytesTransferred: DWORD, + errcode: TOSErrorCode) {.closure.} + + PDispatcher* = ref object + ioPort: THandle + handles: TSet[TSocketHandle] + + TCustomOverlapped = object + Internal*: DWORD + InternalHigh*: DWORD + Offset*: DWORD + OffsetHigh*: DWORD + hEvent*: THANDLE + data*: TCompletionData + + PCustomOverlapped = ptr TCustomOverlapped + + proc hash(x: TSocketHandle): THash {.borrow.} + + proc newDispatcher*(): PDispatcher = + ## Creates a new Dispatcher instance. + new result + result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) + result.handles = initSet[TSocketHandle]() + + proc register*(p: PDispatcher, sock: TSocketHandle) = + ## Registers ``sock`` with the dispatcher ``p``. + if CreateIOCompletionPort(sock.THandle, p.ioPort, + cast[TCompletionKey](sock), 1) == 0: + OSError(OSLastError()) + p.handles.incl(sock) + + proc verifyPresence(p: PDispatcher, sock: TSocketHandle) = + ## Ensures that socket has been registered with the dispatcher. + if sock notin p.handles: + raise newException(EInvalidValue, + "Operation performed on a socket which has not been registered with" & + " the dispatcher yet.") + + proc poll*(p: PDispatcher, timeout = 500) = + ## Waits for completion events and processes them. + if p.handles.len == 0: + raise newException(EInvalidValue, "No handles registered in dispatcher.") + + let llTimeout = + if timeout == -1: winlean.INFINITE + else: timeout.int32 + var lpNumberOfBytesTransferred: DWORD + var lpCompletionKey: ULONG + var lpOverlapped: POverlapped + let res = GetQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred, + addr lpCompletionKey, addr lpOverlapped, llTimeout).bool + + # http://stackoverflow.com/a/12277264/492186 + # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html + var customOverlapped = cast[PCustomOverlapped](lpOverlapped) + if res: + # This is useful for ensuring the reliability of the overlapped struct. + assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle + + customOverlapped.data.cb(customOverlapped.data.sock, + lpNumberOfBytesTransferred, TOSErrorCode(-1)) + dealloc(customOverlapped) + else: + let errCode = OSLastError() + if lpOverlapped != nil: + assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle + customOverlapped.data.cb(customOverlapped.data.sock, + lpNumberOfBytesTransferred, errCode) + dealloc(customOverlapped) + else: + if errCode.int32 == WAIT_TIMEOUT: + # Timed out + discard + else: OSError(errCode) + + var connectExPtr: pointer = nil + var acceptExPtr: pointer = nil + var getAcceptExSockAddrsPtr: pointer = nil + + proc initPointer(s: TSocketHandle, func: var pointer, guid: var TGUID): bool = + # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c + var bytesRet: DWord + func = nil + result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, + sizeof(TGUID).dword, addr func, sizeof(pointer).DWORD, + addr bytesRet, nil, nil) == 0 + + proc initAll() = + let dummySock = socket() + if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX): + OSError(OSLastError()) + if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX): + OSError(OSLastError()) + if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS): + OSError(OSLastError()) + + proc connectEx(s: TSocketHandle, name: ptr TSockAddr, namelen: cint, + lpSendBuffer: pointer, dwSendDataLength: dword, + lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool = + if connectExPtr.isNil: raise newException(EInvalidValue, "Need to initialise ConnectEx().") + let func = + cast[proc (s: TSocketHandle, name: ptr TSockAddr, namelen: cint, + lpSendBuffer: pointer, dwSendDataLength: dword, + lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool {.stdcall.}](connectExPtr) + + result = func(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, + lpOverlapped) + + proc acceptEx(listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer, + dwReceiveDataLength, dwLocalAddressLength, + dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD, + lpOverlapped: POverlapped): bool = + if acceptExPtr.isNil: raise newException(EInvalidValue, "Need to initialise AcceptEx().") + let func = + cast[proc (listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer, + dwReceiveDataLength, dwLocalAddressLength, + dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD, + lpOverlapped: POverlapped): bool {.stdcall.}](acceptExPtr) + result = func(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength, + dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, + lpOverlapped) + + proc getAcceptExSockaddrs(lpOutputBuffer: pointer, + dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: DWORD, + LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: lpint, + RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: lpint) = + if getAcceptExSockAddrsPtr.isNil: + raise newException(EInvalidValue, "Need to initialise getAcceptExSockAddrs().") + + let func = + cast[proc (lpOutputBuffer: pointer, + dwReceiveDataLength, dwLocalAddressLength, + dwRemoteAddressLength: DWORD, LocalSockaddr: ptr ptr TSockAddr, + LocalSockaddrLength: lpint, RemoteSockaddr: ptr ptr TSockAddr, + RemoteSockaddrLength: lpint) {.stdcall.}](getAcceptExSockAddrsPtr) + + func(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, + dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, + RemoteSockaddr, RemoteSockaddrLength) + + proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, + af = AF_INET): PFuture[void] = + ## Connects ``socket`` to server at ``address:port``. + ## + ## Returns a ``PFuture`` which will complete when the connection succeeds + ## or an error occurs. + verifyPresence(p, socket) + var retFuture = newFuture[void]() + # Apparently ``ConnectEx`` expects the socket to be initially bound: + var saddr: Tsockaddr_in + saddr.sin_family = int16(toInt(af)) + saddr.sin_port = 0 + saddr.sin_addr.s_addr = INADDR_ANY + if bindAddr(socket, cast[ptr TSockAddr](addr(saddr)), + sizeof(saddr).TSockLen) < 0'i32: + OSError(OSLastError()) + + var aiList = getAddrInfo(address, port, af) + var success = false + var lastError: TOSErrorCode + var it = aiList + while it != nil: + # "the OVERLAPPED structure must remain valid until the I/O completes" + # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx + var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) + ol.data = TCompletionData(sock: socket, cb: + proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = + if not retFuture.finished: + if errcode == TOSErrorCode(-1): + retFuture.complete() + else: + retFuture.fail(newException(EOS, osErrorMsg(errcode))) + ) + + var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint, + nil, 0, nil, cast[POverlapped](ol)) + if ret: + # Request to connect completed immediately. + success = true + retFuture.complete() + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. + break + else: + lastError = OSLastError() + if lastError.int32 == ERROR_IO_PENDING: + # In this case ``ol`` will be deallocated in ``poll``. + success = true + break + else: + dealloc(ol) + success = false + it = it.ai_next + + dealloc(aiList) + if not success: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + return retFuture + + proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, + flags: int = 0): PFuture[string] = + ## Reads ``size`` bytes from ``socket``. Returned future will complete once + ## all of the requested data is read. If socket is disconnected during the + ## recv operation then the future may complete with only a part of the + ## requested data read. If socket is disconnected and no data is available + ## to be read then the future will complete with a value of ``""``. + verifyPresence(p, socket) + var retFuture = newFuture[string]() + + var dataBuf: TWSABuf + dataBuf.buf = newString(size) + dataBuf.len = size + + var bytesReceived: DWord + var flagsio = flags.dword + var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) + ol.data = TCompletionData(sock: socket, cb: + proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = + if not retFuture.finished: + if errcode == TOSErrorCode(-1): + if bytesCount == 0 and dataBuf.buf[0] == '\0': + retFuture.complete("") + else: + var data = newString(size) + copyMem(addr data[0], addr dataBuf.buf[0], size) + retFuture.complete($data) + else: + retFuture.fail(newException(EOS, osErrorMsg(errcode))) + ) + + let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived, + addr flagsio, cast[POverlapped](ol), nil) + if ret == -1: + let err = OSLastError() + if err.int32 != ERROR_IO_PENDING: + retFuture.fail(newException(EOS, osErrorMsg(err))) + dealloc(ol) + elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': + # We have to ensure that the buffer is empty because WSARecv will tell + # us immediatelly when it was disconnected, even when there is still + # data in the buffer. + # We want to give the user as much data as we can. So we only return + # the empty string (which signals a disconnection) when there is + # nothing left to read. + retFuture.complete("") + # TODO: "For message-oriented sockets, where a zero byte message is often + # allowable, a failure with an error code of WSAEDISCON is used to + # indicate graceful closure." + # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx + else: + # Request to read completed immediately. + var data = newString(size) + copyMem(addr data[0], addr dataBuf.buf[0], size) + retFuture.complete($data) + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. + return retFuture + + proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] = + ## Sends ``data`` to ``socket``. The returned future will complete once all + ## data has been sent. + verifyPresence(p, socket) + var retFuture = newFuture[void]() + + var dataBuf: TWSABuf + dataBuf.buf = data + dataBuf.len = data.len + + var bytesReceived, flags: DWord + var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) + ol.data = TCompletionData(sock: socket, cb: + proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = + if not retFuture.finished: + if errcode == TOSErrorCode(-1): + retFuture.complete() + else: + retFuture.fail(newException(EOS, osErrorMsg(errcode))) + ) + + let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived, + flags, cast[POverlapped](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + retFuture.fail(newException(EOS, osErrorMsg(err))) + dealloc(ol) + else: + retFuture.complete() + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. + return retFuture + + proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): + PFuture[tuple[address: string, client: TSocketHandle]] = + ## Accepts a new connection. Returns a future containing the client socket + ## corresponding to that connection and the remote address of the client. + ## The future will complete when the connection is successfully accepted. + ## + ## The resulting client socket is automatically registered to dispatcher. + verifyPresence(p, socket) + var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() + + var clientSock = socket() + if clientSock == OSInvalidSocket: osError(osLastError()) + + const lpOutputLen = 1024 + var lpOutputBuf = newString(lpOutputLen) + var dwBytesReceived: DWORD + let dwReceiveDataLength = 0.DWORD # We don't want any data to be read. + let dwLocalAddressLength = DWORD(sizeof (TSockaddr_in) + 16) + let dwRemoteAddressLength = DWORD(sizeof(TSockaddr_in) + 16) + + template completeAccept(): stmt {.immediate, dirty.} = + var listenSock = socket + let setoptRet = setsockopt(clientSock, SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, addr listenSock, + sizeof(listenSock).TSockLen) + if setoptRet != 0: osError(osLastError()) + + var LocalSockaddr, RemoteSockaddr: ptr TSockAddr + var localLen, remoteLen: int32 + getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength, + dwLocalAddressLength, dwRemoteAddressLength, + addr LocalSockaddr, addr localLen, + addr RemoteSockaddr, addr remoteLen) + p.register(clientSock) + # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 + retFuture.complete( + (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr), + client: clientSock) + ) + + var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) + ol.data = TCompletionData(sock: socket, cb: + proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = + if not retFuture.finished: + if errcode == TOSErrorCode(-1): + completeAccept() + else: + retFuture.fail(newException(EOS, osErrorMsg(errcode))) + ) + + # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx + let ret = acceptEx(socket, clientSock, addr lpOutputBuf[0], + dwReceiveDataLength, + dwLocalAddressLength, + dwRemoteAddressLength, + addr dwBytesReceived, cast[POverlapped](ol)) + + if not ret: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + retFuture.fail(newException(EOS, osErrorMsg(err))) + dealloc(ol) + else: + completeAccept() + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. + + return retFuture + + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + ## Creates a new socket and registers it with the dispatcher implicitly. + result = socket(domain, typ, protocol) + result.setBlocking(false) + disp.register(result) + + proc close*(disp: PDispatcher, socket: TSocketHandle) = + ## Closes a socket and ensures that it is unregistered. + socket.close() + disp.handles.excl(socket) + + initAll() +else: + import selectors + from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK + type + TCallback = proc (sock: TSocketHandle): bool {.closure.} + + PData* = ref object of PObject + sock: TSocketHandle + readCBs: seq[TCallback] + writeCBs: seq[TCallback] + + PDispatcher* = ref object + selector: PSelector + + proc newDispatcher*(): PDispatcher = + new result + result.selector = newSelector() + + proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = + assert sock in p.selector + discard p.selector.update(sock, events) + + proc register(p: PDispatcher, sock: TSocketHandle) = + var data = PData(sock: sock, readCBs: @[], writeCBs: @[]) + p.selector.register(sock, {}, data.PObject) + + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + result = socket(domain, typ, protocol) + result.setBlocking(false) + disp.register(result) + + proc close*(disp: PDispatcher, sock: TSocketHandle) = + sock.close() + disp.selector.unregister(sock) + + proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = + if sock notin p.selector: + raise newException(EInvalidValue, "File descriptor not registered.") + p.selector[sock].data.PData.readCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvRead}) + + proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = + if sock notin p.selector: + raise newException(EInvalidValue, "File descriptor not registered.") + p.selector[sock].data.PData.writeCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvWrite}) + + proc poll*(p: PDispatcher, timeout = 500) = + for info in p.selector.select(timeout): + let data = PData(info.key.data) + assert data.sock == info.key.fd + #echo("In poll ", data.sock.cint) + if EvRead in info.events: + # Callback may add items to ``data.readCBs`` which causes issues if + # we are iterating over ``data.readCBs`` at the same time. We therefore + # make a copy to iterate over. + let currentCBs = data.readCBs + data.readCBs = @[] + for cb in currentCBs: + if not cb(data.sock): + # Callback wants to be called again. + data.readCBs.add(cb) + + if EvWrite in info.events: + let currentCBs = data.writeCBs + data.writeCBs = @[] + for cb in currentCBs: + if not cb(data.sock): + # Callback wants to be called again. + data.writeCBs.add(cb) + + if info.key in p.selector: + var newEvents: set[TEvent] + if data.readCBs.len != 0: newEvents = {EvRead} + if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} + if newEvents != info.key.events: + echo(info.key.events, " -> ", newEvents) + p.update(data.sock, newEvents) + else: + # FD no longer a part of the selector. Likely been closed + # (e.g. socket disconnected). + + proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, + af = AF_INET): PFuture[void] = + var retFuture = newFuture[void]() + + proc cb(sock: TSocketHandle): bool = + # We have connected. + retFuture.complete() + return true + + var aiList = getAddrInfo(address, port, af) + var success = false + var lastError: TOSErrorCode + var it = aiList + while it != nil: + var ret = connect(socket, it.ai_addr, it.ai_addrlen.TSocklen) + if ret == 0: + # Request to connect completed immediately. + success = true + retFuture.complete() + break + else: + lastError = osLastError() + if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: + success = true + addWrite(p, socket, cb) + break + else: + success = false + it = it.ai_next + + dealloc(aiList) + if not success: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + return retFuture + + proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, + flags: int = 0): PFuture[string] = + var retFuture = newFuture[string]() + + var readBuffer = newString(size) + var sizeRead = 0 + + proc cb(sock: TSocketHandle): bool = + result = true + let netSize = size - sizeRead + let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) + #echo("recv cb res: ", res) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + elif res == 0: + #echo("Disconnected recv: ", sizeRead) + # Disconnected + if sizeRead == 0: + retFuture.complete("") + else: + readBuffer.setLen(sizeRead) + retFuture.complete(readBuffer) + else: + sizeRead.inc(res) + if res != netSize: + result = false # We want to read all the data requested. + else: + retFuture.complete(readBuffer) + #echo("Recv cb result: ", result) + + addRead(p, socket, cb) + return retFuture + + proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] = + var retFuture = newFuture[void]() + + var written = 0 + + proc cb(sock: TSocketHandle): bool = + result = true + let netSize = data.len-written + var d = data.cstring + let res = send(sock, addr d[written], netSize, 0.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + written.inc(res) + if res != netSize: + result = false # We still have data to send. + else: + retFuture.complete() + addWrite(p, socket, cb) + return retFuture + + proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): + PFuture[tuple[address: string, client: TSocketHandle]] = + var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() + proc cb(sock: TSocketHandle): bool = + result = true + var sockAddress: Tsockaddr_in + var addrLen = sizeof(sockAddress).TSocklen + var client = accept(sock, cast[ptr TSockAddr](addr(sockAddress)), + addr(addrLen)) + if client == osInvalidSocket: + let lastError = osLastError() + assert lastError.int32 notin {EWOULDBLOCK, EAGAIN} + if lastError.int32 == EINTR: + return false + else: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + else: + p.register(client) + retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client)) + addRead(p, socket, cb) + return retFuture + +proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] = + ## Accepts a new connection. Returns a future containing the client socket + ## corresponding to that connection. + ## The future will complete when the connection is successfully accepted. + var retFut = newFuture[TSocketHandle]() + var fut = p.acceptAddr(socket) + fut.callback = + proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) = + assert future.finished + if future.failed: + retFut.fail(future.error) + else: + retFut.complete(future.read.client) + return retFut + +# -- Await Macro + +template createCb*(cbName, varNameIterSym, retFutureSym: expr): stmt {.immediate, dirty.} = + proc cbName {.closure.} = + if not varNameIterSym.finished: + var next = varNameIterSym() + if next == nil: + assert retFutureSym.finished, "Async procedure's return Future was not finished." + else: + next.callback = cbName + +template createVar(futSymName: string, asyncProc: PNimrodNode, + valueReceiver: expr) {.immediate, dirty.} = + # TODO: Used template here due to bug #926 + result = newNimNode(nnkStmtList) + var futSym = newIdentNode(futSymName) #genSym(nskVar, "future") + result.add newVarStmt(futSym, asyncProc) # -> var future = y + result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future + valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future.read + +proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} = + result = node + case node.kind + of nnkReturnStmt: + result = newNimNode(nnkStmtList) + result.add newCall(newIdentNode("complete"), retFutureSym, + if node[0].kind == nnkEmpty: newIdentNode("result") else: node[0]) + result.add newNimNode(nnkYieldStmt).add(newNilLit()) + of nnkCommand: + if node[0].ident == !"await": + case node[1].kind + of nnkIdent: + # await x + result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x + of nnkCall: + # await foo(p, x) + var futureValue: PNimrodNode + createVar("future" & $node[1][0].toStrLit, node[1], futureValue) + result.add futureValue + else: + error("Invalid node kind in 'await', got: " & $node[1].kind) + elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and + node[1][0].ident == !"await": + # foo await x + var newCommand = node + createVar("future" & $node[0].ident, node[1][0], newCommand[1]) + result.add newCommand + + of nnkVarSection, nnkLetSection: + case node[0][2].kind + of nnkCommand: + if node[0][2][0].ident == !"await": + # var x = await y + var newVarSection = node # TODO: Should this use copyNimNode? + createVar("future" & $node[0][0].ident, node[0][2][1], + newVarSection[0][2]) + result.add newVarSection + else: discard + of nnkAsgn: + case node[1].kind + of nnkCommand: + if node[1][0].ident == !"await": + # x = await y + var newAsgn = node + createVar("future" & $node[0].ident, node[1][1], newAsgn[1]) + result.add newAsgn + else: discard + of nnkDiscardStmt: + # discard await x + if node[0][0].kind == nnkIdent and node[0][0].ident == !"await": + var dummy = newNimNode(nnkStmtList) + createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], dummy) + else: discard + + for i in 0 .. var retFuture = newFuture[T]() + var retFutureSym = newIdentNode("retFuture") #genSym(nskVar, "retFuture") + outerProcBody.add( + newVarStmt(retFutureSym, + newCall( + newNimNode(nnkBracketExpr).add( + newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`. + newIdentNode(subtypeName))))) # Get type from return type of this proc + echo(treeRepr(outerProcBody)) + # -> iterator nameIter(): PFutureBase {.closure.} = + # -> var result: T + # -> + # -> complete(retFuture, result) + var iteratorNameSym = newIdentNode($prc[0].getName & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter") + var procBody = prc[6].processBody(retFutureSym) + if subtypeName != "void": + procBody.insert(0, newNimNode(nnkVarSection).add( + newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T + procBody.add( + newCall(newIdentNode("complete"), + retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result) + else: + # -> complete(retFuture) + procBody.add(newCall(newIdentNode("complete"), retFutureSym)) + + var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")], + procBody, nnkIteratorDef) + closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure")) + outerProcBody.add(closureIterator) + + # -> var nameIterVar = nameIter + # -> var first = nameIterVar() + var varNameIterSym = newIdentNode($prc[0].getName & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar") + var varNameIter = newVarStmt(varNameIterSym, iteratorNameSym) + outerProcBody.add varNameIter + var varFirstSym = genSym(nskVar, "first") + var varFirst = newVarStmt(varFirstSym, newCall(varNameIterSym)) + outerProcBody.add varFirst + + # -> createCb(cb, nameIter, retFuture) + var cbName = newIdentNode("cb") + var procCb = newCall("createCb", cbName, varNameIterSym, retFutureSym) + outerProcBody.add procCb + + # -> first.callback = cb + outerProcBody.add newAssignment( + newDotExpr(varFirstSym, newIdentNode("callback")), + cbName) + + # -> return retFuture + outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym) + + result = prc + + # Remove the 'async' pragma. + for i in 0 .. 0 and c == "\L": + discard await p.recv(socket, 1) + addNLIfEmpty() + return + elif c == "\L": + addNLIfEmpty() + return + add(result.string, c) + +var gDisp*{.threadvar.}: PDispatcher ## Global dispatcher +gDisp = newDispatcher() + +proc runForever*() = + ## Begins a never ending global dispatcher poll loop. + while true: + gDisp.poll() + + +when isMainModule: + + var p = newDispatcher() + var sock = p.socket() + sock.setBlocking false + + + when false: + # Await tests + proc main(p: PDispatcher): PFuture[int] {.async.} = + discard await p.connect(sock, "irc.freenode.net", TPort(6667)) + while true: + echo("recvLine") + var line = await p.recvLine(sock) + echo("Line is: ", line.repr) + if line == "": + echo "Disconnected" + break + + proc peekTest(p: PDispatcher): PFuture[int] {.async.} = + discard await p.connect(sock, "localhost", TPort(6667)) + while true: + var line = await p.recv(sock, 1, MSG_PEEK) + var line2 = await p.recv(sock, 1) + echo(line.repr) + echo(line2.repr) + echo("---") + if line2 == "": break + sleep(500) + + var f = main(p) + + + else: + when false: + + var f = p.connect(sock, "irc.poop.nl", TPort(6667)) + f.callback = + proc (future: PFuture[int]) = + echo("Connected in future!") + echo(future.read) + for i in 0 .. 50: + var recvF = p.recv(sock, 10) + recvF.callback = + proc (future: PFuture[string]) = + echo("Read ", future.read.len, ": ", future.read.repr) + + else: + + sock.bindAddr(TPort(6667)) + sock.listen() + proc onAccept(future: PFuture[TSocketHandle]) = + let client = future.read + echo "Accepted ", client.cint + var t = p.send(client, "test\c\L") + t.callback = + proc (future: PFuture[int]) = + echo("Send: ", future.read) + client.close() + + var f = p.accept(sock) + f.callback = onAccept + + var f = p.accept(sock) + f.callback = onAccept + + while true: + p.poll() + + + + + + + + diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim deleted file mode 100644 index fbb02c37c..000000000 --- a/lib/pure/asyncio2.nim +++ /dev/null @@ -1,997 +0,0 @@ -# -# -# Nimrod's Runtime Library -# (c) Copyright 2014 Dominik Picheta -# -# See the file "copying.txt", included in this -# distribution, for details about the copyright. -# - -import os, oids, tables, strutils, macros - -import sockets2 - -## Asyncio2 -## -------- -## -## This module implements a brand new asyncio module based on Futures. -## IOCP is used under the hood on Windows and the selectors module is used for -## other operating systems. - -# -- Futures - -type - PFutureBase* = ref object of PObject - cb: proc () {.closure.} - finished: bool - - PFuture*[T] = ref object of PFutureBase - value: T - error: ref EBase - -proc newFuture*[T](): PFuture[T] = - ## Creates a new future. - new(result) - result.finished = false - -proc complete*[T](future: PFuture[T], val: T) = - ## Completes ``future`` with value ``val``. - assert(not future.finished, "Future already finished, cannot finish twice.") - assert(future.error == nil) - future.value = val - future.finished = true - if future.cb != nil: - future.cb() - -proc complete*(future: PFuture[void]) = - ## Completes a void ``future``. - assert(not future.finished, "Future already finished, cannot finish twice.") - assert(future.error == nil) - future.finished = true - if future.cb != nil: - future.cb() - -proc fail*[T](future: PFuture[T], error: ref EBase) = - ## Completes ``future`` with ``error``. - assert(not future.finished, "Future already finished, cannot finish twice.") - future.finished = true - future.error = error - if future.cb != nil: - future.cb() - -proc `callback=`*(future: PFutureBase, cb: proc () {.closure.}) = - ## Sets the callback proc to be called when the future completes. - ## - ## If future has already completed then ``cb`` will be called immediately. - ## - ## **Note**: You most likely want the other ``callback`` setter which - ## passes ``future`` as a param to the callback. - future.cb = cb - if future.finished: - future.cb() - -proc `callback=`*[T](future: PFuture[T], - cb: proc (future: PFuture[T]) {.closure.}) = - ## Sets the callback proc to be called when the future completes. - ## - ## If future has already completed then ``cb`` will be called immediately. - future.callback = proc () = cb(future) - -proc read*[T](future: PFuture[T]): T = - ## Retrieves the value of ``future``. Future must be finished otherwise - ## this function will fail with a ``EInvalidValue`` exception. - ## - ## If the result of the future is an error then that error will be raised. - if future.finished: - if future.error != nil: raise future.error - when T isnot void: - return future.value - else: - # TODO: Make a custom exception type for this? - raise newException(EInvalidValue, "Future still in progress.") - -proc finished*[T](future: PFuture[T]): bool = - ## Determines whether ``future`` has completed. - ## - ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. - future.finished - -proc failed*[T](future: PFuture[T]): bool = - ## Determines whether ``future`` completed with an error. - future.error != nil - -when defined(windows) or defined(nimdoc): - import winlean, sets, hashes - type - TCompletionKey = dword - - TCompletionData* = object - sock: TSocketHandle - cb: proc (sock: TSocketHandle, bytesTransferred: DWORD, - errcode: TOSErrorCode) {.closure.} - - PDispatcher* = ref object - ioPort: THandle - handles: TSet[TSocketHandle] - - TCustomOverlapped = object - Internal*: DWORD - InternalHigh*: DWORD - Offset*: DWORD - OffsetHigh*: DWORD - hEvent*: THANDLE - data*: TCompletionData - - PCustomOverlapped = ptr TCustomOverlapped - - proc hash(x: TSocketHandle): THash {.borrow.} - - proc newDispatcher*(): PDispatcher = - ## Creates a new Dispatcher instance. - new result - result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) - result.handles = initSet[TSocketHandle]() - - proc register*(p: PDispatcher, sock: TSocketHandle) = - ## Registers ``sock`` with the dispatcher ``p``. - if CreateIOCompletionPort(sock.THandle, p.ioPort, - cast[TCompletionKey](sock), 1) == 0: - OSError(OSLastError()) - p.handles.incl(sock) - - proc verifyPresence(p: PDispatcher, sock: TSocketHandle) = - ## Ensures that socket has been registered with the dispatcher. - if sock notin p.handles: - raise newException(EInvalidValue, - "Operation performed on a socket which has not been registered with" & - " the dispatcher yet.") - - proc poll*(p: PDispatcher, timeout = 500) = - ## Waits for completion events and processes them. - if p.handles.len == 0: - raise newException(EInvalidValue, "No handles registered in dispatcher.") - - let llTimeout = - if timeout == -1: winlean.INFINITE - else: timeout.int32 - var lpNumberOfBytesTransferred: DWORD - var lpCompletionKey: ULONG - var lpOverlapped: POverlapped - let res = GetQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred, - addr lpCompletionKey, addr lpOverlapped, llTimeout).bool - - # http://stackoverflow.com/a/12277264/492186 - # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html - var customOverlapped = cast[PCustomOverlapped](lpOverlapped) - if res: - # This is useful for ensuring the reliability of the overlapped struct. - assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle - - customOverlapped.data.cb(customOverlapped.data.sock, - lpNumberOfBytesTransferred, TOSErrorCode(-1)) - dealloc(customOverlapped) - else: - let errCode = OSLastError() - if lpOverlapped != nil: - assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle - customOverlapped.data.cb(customOverlapped.data.sock, - lpNumberOfBytesTransferred, errCode) - dealloc(customOverlapped) - else: - if errCode.int32 == WAIT_TIMEOUT: - # Timed out - discard - else: OSError(errCode) - - var connectExPtr: pointer = nil - var acceptExPtr: pointer = nil - var getAcceptExSockAddrsPtr: pointer = nil - - proc initPointer(s: TSocketHandle, func: var pointer, guid: var TGUID): bool = - # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c - var bytesRet: DWord - func = nil - result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, - sizeof(TGUID).dword, addr func, sizeof(pointer).DWORD, - addr bytesRet, nil, nil) == 0 - - proc initAll() = - let dummySock = socket() - if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX): - OSError(OSLastError()) - if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX): - OSError(OSLastError()) - if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS): - OSError(OSLastError()) - - proc connectEx(s: TSocketHandle, name: ptr TSockAddr, namelen: cint, - lpSendBuffer: pointer, dwSendDataLength: dword, - lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool = - if connectExPtr.isNil: raise newException(EInvalidValue, "Need to initialise ConnectEx().") - let func = - cast[proc (s: TSocketHandle, name: ptr TSockAddr, namelen: cint, - lpSendBuffer: pointer, dwSendDataLength: dword, - lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool {.stdcall.}](connectExPtr) - - result = func(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, - lpOverlapped) - - proc acceptEx(listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD, - lpOverlapped: POverlapped): bool = - if acceptExPtr.isNil: raise newException(EInvalidValue, "Need to initialise AcceptEx().") - let func = - cast[proc (listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD, - lpOverlapped: POverlapped): bool {.stdcall.}](acceptExPtr) - result = func(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength, - dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, - lpOverlapped) - - proc getAcceptExSockaddrs(lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: DWORD, - LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: lpint, - RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: lpint) = - if getAcceptExSockAddrsPtr.isNil: - raise newException(EInvalidValue, "Need to initialise getAcceptExSockAddrs().") - - let func = - cast[proc (lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: DWORD, LocalSockaddr: ptr ptr TSockAddr, - LocalSockaddrLength: lpint, RemoteSockaddr: ptr ptr TSockAddr, - RemoteSockaddrLength: lpint) {.stdcall.}](getAcceptExSockAddrsPtr) - - func(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, - RemoteSockaddr, RemoteSockaddrLength) - - proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, - af = AF_INET): PFuture[void] = - ## Connects ``socket`` to server at ``address:port``. - ## - ## Returns a ``PFuture`` which will complete when the connection succeeds - ## or an error occurs. - verifyPresence(p, socket) - var retFuture = newFuture[void]() - # Apparently ``ConnectEx`` expects the socket to be initially bound: - var saddr: Tsockaddr_in - saddr.sin_family = int16(toInt(af)) - saddr.sin_port = 0 - saddr.sin_addr.s_addr = INADDR_ANY - if bindAddr(socket, cast[ptr TSockAddr](addr(saddr)), - sizeof(saddr).TSockLen) < 0'i32: - OSError(OSLastError()) - - var aiList = getAddrInfo(address, port, af) - var success = false - var lastError: TOSErrorCode - var it = aiList - while it != nil: - # "the OVERLAPPED structure must remain valid until the I/O completes" - # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx - var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) - ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = - if not retFuture.finished: - if errcode == TOSErrorCode(-1): - retFuture.complete() - else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) - ) - - var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint, - nil, 0, nil, cast[POverlapped](ol)) - if ret: - # Request to connect completed immediately. - success = true - retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. - break - else: - lastError = OSLastError() - if lastError.int32 == ERROR_IO_PENDING: - # In this case ``ol`` will be deallocated in ``poll``. - success = true - break - else: - dealloc(ol) - success = false - it = it.ai_next - - dealloc(aiList) - if not success: - retFuture.fail(newException(EOS, osErrorMsg(lastError))) - return retFuture - - proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, - flags: int = 0): PFuture[string] = - ## Reads ``size`` bytes from ``socket``. Returned future will complete once - ## all of the requested data is read. If socket is disconnected during the - ## recv operation then the future may complete with only a part of the - ## requested data read. If socket is disconnected and no data is available - ## to be read then the future will complete with a value of ``""``. - verifyPresence(p, socket) - var retFuture = newFuture[string]() - - var dataBuf: TWSABuf - dataBuf.buf = newString(size) - dataBuf.len = size - - var bytesReceived: DWord - var flagsio = flags.dword - var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) - ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = - if not retFuture.finished: - if errcode == TOSErrorCode(-1): - if bytesCount == 0 and dataBuf.buf[0] == '\0': - retFuture.complete("") - else: - var data = newString(size) - copyMem(addr data[0], addr dataBuf.buf[0], size) - retFuture.complete($data) - else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) - ) - - let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived, - addr flagsio, cast[POverlapped](ol), nil) - if ret == -1: - let err = OSLastError() - if err.int32 != ERROR_IO_PENDING: - retFuture.fail(newException(EOS, osErrorMsg(err))) - dealloc(ol) - elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': - # We have to ensure that the buffer is empty because WSARecv will tell - # us immediatelly when it was disconnected, even when there is still - # data in the buffer. - # We want to give the user as much data as we can. So we only return - # the empty string (which signals a disconnection) when there is - # nothing left to read. - retFuture.complete("") - # TODO: "For message-oriented sockets, where a zero byte message is often - # allowable, a failure with an error code of WSAEDISCON is used to - # indicate graceful closure." - # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx - else: - # Request to read completed immediately. - var data = newString(size) - copyMem(addr data[0], addr dataBuf.buf[0], size) - retFuture.complete($data) - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. - return retFuture - - proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] = - ## Sends ``data`` to ``socket``. The returned future will complete once all - ## data has been sent. - verifyPresence(p, socket) - var retFuture = newFuture[void]() - - var dataBuf: TWSABuf - dataBuf.buf = data - dataBuf.len = data.len - - var bytesReceived, flags: DWord - var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) - ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = - if not retFuture.finished: - if errcode == TOSErrorCode(-1): - retFuture.complete() - else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) - ) - - let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived, - flags, cast[POverlapped](ol), nil) - if ret == -1: - let err = osLastError() - if err.int32 != ERROR_IO_PENDING: - retFuture.fail(newException(EOS, osErrorMsg(err))) - dealloc(ol) - else: - retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. - return retFuture - - proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): - PFuture[tuple[address: string, client: TSocketHandle]] = - ## Accepts a new connection. Returns a future containing the client socket - ## corresponding to that connection and the remote address of the client. - ## The future will complete when the connection is successfully accepted. - ## - ## The resulting client socket is automatically registered to dispatcher. - verifyPresence(p, socket) - var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() - - var clientSock = socket() - if clientSock == OSInvalidSocket: osError(osLastError()) - - const lpOutputLen = 1024 - var lpOutputBuf = newString(lpOutputLen) - var dwBytesReceived: DWORD - let dwReceiveDataLength = 0.DWORD # We don't want any data to be read. - let dwLocalAddressLength = DWORD(sizeof (TSockaddr_in) + 16) - let dwRemoteAddressLength = DWORD(sizeof(TSockaddr_in) + 16) - - template completeAccept(): stmt {.immediate, dirty.} = - var listenSock = socket - let setoptRet = setsockopt(clientSock, SOL_SOCKET, - SO_UPDATE_ACCEPT_CONTEXT, addr listenSock, - sizeof(listenSock).TSockLen) - if setoptRet != 0: osError(osLastError()) - - var LocalSockaddr, RemoteSockaddr: ptr TSockAddr - var localLen, remoteLen: int32 - getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength, - dwLocalAddressLength, dwRemoteAddressLength, - addr LocalSockaddr, addr localLen, - addr RemoteSockaddr, addr remoteLen) - p.register(clientSock) - # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 - retFuture.complete( - (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr), - client: clientSock) - ) - - var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) - ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = - if not retFuture.finished: - if errcode == TOSErrorCode(-1): - completeAccept() - else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) - ) - - # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx - let ret = acceptEx(socket, clientSock, addr lpOutputBuf[0], - dwReceiveDataLength, - dwLocalAddressLength, - dwRemoteAddressLength, - addr dwBytesReceived, cast[POverlapped](ol)) - - if not ret: - let err = osLastError() - if err.int32 != ERROR_IO_PENDING: - retFuture.fail(newException(EOS, osErrorMsg(err))) - dealloc(ol) - else: - completeAccept() - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. - - return retFuture - - proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, - typ: TType = SOCK_STREAM, - protocol: TProtocol = IPPROTO_TCP): TSocketHandle = - ## Creates a new socket and registers it with the dispatcher implicitly. - result = socket(domain, typ, protocol) - disp.register(result) - - proc close*(disp: PDispatcher, socket: TSocketHandle) = - ## Closes a socket and ensures that it is unregistered. - socket.close() - disp.handles.excl(socket) - - initAll() -else: - import selectors - from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK - type - TCallback = proc (sock: TSocketHandle): bool {.closure.} - - PData* = ref object of PObject - sock: TSocketHandle - readCBs: seq[TCallback] - writeCBs: seq[TCallback] - - PDispatcher* = ref object - selector: PSelector - - proc newDispatcher*(): PDispatcher = - new result - result.selector = newSelector() - - proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = - assert sock in p.selector - discard p.selector.update(sock, events) - - proc register(p: PDispatcher, sock: TSocketHandle) = - var data = PData(sock: sock, readCBs: @[], writeCBs: @[]) - p.selector.register(sock, {}, data.PObject) - - proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, - typ: TType = SOCK_STREAM, - protocol: TProtocol = IPPROTO_TCP): TSocketHandle = - result = socket(domain, typ, protocol) - disp.register(result) - - proc close*(disp: PDispatcher, sock: TSocketHandle) = - sock.close() - disp.selector.unregister(sock) - - proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = - if sock notin p.selector: - raise newException(EInvalidValue, "File descriptor not registered.") - p.selector[sock].data.PData.readCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvRead}) - - proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = - if sock notin p.selector: - raise newException(EInvalidValue, "File descriptor not registered.") - p.selector[sock].data.PData.writeCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvWrite}) - - proc poll*(p: PDispatcher, timeout = 500) = - for info in p.selector.select(timeout): - let data = PData(info.key.data) - assert data.sock == info.key.fd - #echo("In poll ", data.sock.cint) - if EvRead in info.events: - # Callback may add items to ``data.readCBs`` which causes issues if - # we are iterating over ``data.readCBs`` at the same time. We therefore - # make a copy to iterate over. - let currentCBs = data.readCBs - data.readCBs = @[] - for cb in currentCBs: - if not cb(data.sock): - # Callback wants to be called again. - data.readCBs.add(cb) - - if EvWrite in info.events: - let currentCBs = data.writeCBs - data.writeCBs = @[] - for cb in currentCBs: - if not cb(data.sock): - # Callback wants to be called again. - data.writeCBs.add(cb) - - if info.key in p.selector: - var newEvents: set[TEvent] - if data.readCBs.len != 0: newEvents = {EvRead} - if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} - if newEvents != info.key.events: - echo(info.key.events, " -> ", newEvents) - p.update(data.sock, newEvents) - else: - # FD no longer a part of the selector. Likely been closed - # (e.g. socket disconnected). - - proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, - af = AF_INET): PFuture[void] = - var retFuture = newFuture[void]() - - proc cb(sock: TSocketHandle): bool = - # We have connected. - retFuture.complete() - return true - - var aiList = getAddrInfo(address, port, af) - var success = false - var lastError: TOSErrorCode - var it = aiList - while it != nil: - var ret = connect(socket, it.ai_addr, it.ai_addrlen.TSocklen) - if ret == 0: - # Request to connect completed immediately. - success = true - retFuture.complete() - break - else: - lastError = osLastError() - if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: - success = true - addWrite(p, socket, cb) - break - else: - success = false - it = it.ai_next - - dealloc(aiList) - if not success: - retFuture.fail(newException(EOS, osErrorMsg(lastError))) - return retFuture - - proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, - flags: int = 0): PFuture[string] = - var retFuture = newFuture[string]() - - var readBuffer = newString(size) - var sizeRead = 0 - - proc cb(sock: TSocketHandle): bool = - result = true - let netSize = size - sizeRead - let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) - #echo("recv cb res: ", res) - if res < 0: - let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: - retFuture.fail(newException(EOS, osErrorMsg(lastError))) - else: - result = false # We still want this callback to be called. - elif res == 0: - #echo("Disconnected recv: ", sizeRead) - # Disconnected - if sizeRead == 0: - retFuture.complete("") - else: - readBuffer.setLen(sizeRead) - retFuture.complete(readBuffer) - else: - sizeRead.inc(res) - if res != netSize: - result = false # We want to read all the data requested. - else: - retFuture.complete(readBuffer) - #echo("Recv cb result: ", result) - - addRead(p, socket, cb) - return retFuture - - proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] = - var retFuture = newFuture[void]() - - var written = 0 - - proc cb(sock: TSocketHandle): bool = - result = true - let netSize = data.len-written - var d = data.cstring - let res = send(sock, addr d[written], netSize, 0.cint) - if res < 0: - let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: - retFuture.fail(newException(EOS, osErrorMsg(lastError))) - else: - result = false # We still want this callback to be called. - else: - written.inc(res) - if res != netSize: - result = false # We still have data to send. - else: - retFuture.complete() - addWrite(p, socket, cb) - return retFuture - - proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): - PFuture[tuple[address: string, client: TSocketHandle]] = - var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() - proc cb(sock: TSocketHandle): bool = - result = true - var sockAddress: Tsockaddr_in - var addrLen = sizeof(sockAddress).TSocklen - var client = accept(sock, cast[ptr TSockAddr](addr(sockAddress)), - addr(addrLen)) - if client == osInvalidSocket: - let lastError = osLastError() - assert lastError.int32 notin {EWOULDBLOCK, EAGAIN} - if lastError.int32 == EINTR: - return false - else: - retFuture.fail(newException(EOS, osErrorMsg(lastError))) - else: - p.register(client) - retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client)) - addRead(p, socket, cb) - return retFuture - -proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] = - ## Accepts a new connection. Returns a future containing the client socket - ## corresponding to that connection. - ## The future will complete when the connection is successfully accepted. - var retFut = newFuture[TSocketHandle]() - var fut = p.acceptAddr(socket) - fut.callback = - proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) = - assert future.finished - if future.failed: - retFut.fail(future.error) - else: - retFut.complete(future.read.client) - return retFut - -# -- Await Macro - -template createCb*(cbName, varNameIterSym, retFutureSym: expr): stmt {.immediate, dirty.} = - proc cbName {.closure.} = - if not varNameIterSym.finished: - var next = varNameIterSym() - if next == nil: - assert retFutureSym.finished, "Async procedure's return Future was not finished." - else: - next.callback = cbName - -template createVar(futSymName: string, asyncProc: PNimrodNode, - valueReceiver: expr) {.immediate, dirty.} = - # TODO: Used template here due to bug #926 - result = newNimNode(nnkStmtList) - var futSym = newIdentNode(futSymName) #genSym(nskVar, "future") - result.add newVarStmt(futSym, asyncProc) # -> var future = y - result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future - valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future.read - -proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} = - result = node - case node.kind - of nnkReturnStmt: - result = newNimNode(nnkStmtList) - result.add newCall(newIdentNode("complete"), retFutureSym, - if node[0].kind == nnkEmpty: newIdentNode("result") else: node[0]) - result.add newNimNode(nnkYieldStmt).add(newNilLit()) - of nnkCommand: - if node[0].ident == !"await": - case node[1].kind - of nnkIdent: - # await x - result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x - of nnkCall: - # await foo(p, x) - var futureValue: PNimrodNode - createVar("future" & $node[1][0].toStrLit, node[1], futureValue) - result.add futureValue - else: - error("Invalid node kind in 'await', got: " & $node[1].kind) - elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and - node[1][0].ident == !"await": - # foo await x - var newCommand = node - createVar("future" & $node[0].ident, node[1][0], newCommand[1]) - result.add newCommand - - of nnkVarSection, nnkLetSection: - case node[0][2].kind - of nnkCommand: - if node[0][2][0].ident == !"await": - # var x = await y - var newVarSection = node # TODO: Should this use copyNimNode? - createVar("future" & $node[0][0].ident, node[0][2][1], - newVarSection[0][2]) - result.add newVarSection - else: discard - of nnkAsgn: - case node[1].kind - of nnkCommand: - if node[1][0].ident == !"await": - # x = await y - var newAsgn = node - createVar("future" & $node[0].ident, node[1][1], newAsgn[1]) - result.add newAsgn - else: discard - of nnkDiscardStmt: - # discard await x - if node[0][0].kind == nnkIdent and node[0][0].ident == !"await": - var dummy = newNimNode(nnkStmtList) - createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], dummy) - else: discard - - for i in 0 .. var retFuture = newFuture[T]() - var retFutureSym = newIdentNode("retFuture") #genSym(nskVar, "retFuture") - outerProcBody.add( - newVarStmt(retFutureSym, - newCall( - newNimNode(nnkBracketExpr).add( - newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`. - newIdentNode(subtypeName))))) # Get type from return type of this proc - echo(treeRepr(outerProcBody)) - # -> iterator nameIter(): PFutureBase {.closure.} = - # -> var result: T - # -> - # -> complete(retFuture, result) - var iteratorNameSym = newIdentNode($prc[0].getName & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter") - var procBody = prc[6].processBody(retFutureSym) - if subtypeName != "void": - procBody.insert(0, newNimNode(nnkVarSection).add( - newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T - procBody.add( - newCall(newIdentNode("complete"), - retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result) - else: - # -> complete(retFuture) - procBody.add(newCall(newIdentNode("complete"), retFutureSym)) - - var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")], - procBody, nnkIteratorDef) - closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure")) - outerProcBody.add(closureIterator) - - # -> var nameIterVar = nameIter - # -> var first = nameIterVar() - var varNameIterSym = newIdentNode($prc[0].getName & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar") - var varNameIter = newVarStmt(varNameIterSym, iteratorNameSym) - outerProcBody.add varNameIter - var varFirstSym = genSym(nskVar, "first") - var varFirst = newVarStmt(varFirstSym, newCall(varNameIterSym)) - outerProcBody.add varFirst - - # -> createCb(cb, nameIter, retFuture) - var cbName = newIdentNode("cb") - var procCb = newCall("createCb", cbName, varNameIterSym, retFutureSym) - outerProcBody.add procCb - - # -> first.callback = cb - outerProcBody.add newAssignment( - newDotExpr(varFirstSym, newIdentNode("callback")), - cbName) - - # -> return retFuture - outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym) - - result = prc - - # Remove the 'async' pragma. - for i in 0 .. 0 and c == "\L": - discard await p.recv(socket, 1) - addNLIfEmpty() - return - elif c == "\L": - addNLIfEmpty() - return - add(result.string, c) - -when isMainModule: - - var p = newDispatcher() - var sock = p.socket() - sock.setBlocking false - - - when false: - # Await tests - proc main(p: PDispatcher): PFuture[int] {.async.} = - discard await p.connect(sock, "irc.freenode.net", TPort(6667)) - while true: - echo("recvLine") - var line = await p.recvLine(sock) - echo("Line is: ", line.repr) - if line == "": - echo "Disconnected" - break - - proc peekTest(p: PDispatcher): PFuture[int] {.async.} = - discard await p.connect(sock, "localhost", TPort(6667)) - while true: - var line = await p.recv(sock, 1, MSG_PEEK) - var line2 = await p.recv(sock, 1) - echo(line.repr) - echo(line2.repr) - echo("---") - if line2 == "": break - sleep(500) - - var f = main(p) - - - else: - when false: - - var f = p.connect(sock, "irc.poop.nl", TPort(6667)) - f.callback = - proc (future: PFuture[int]) = - echo("Connected in future!") - echo(future.read) - for i in 0 .. 50: - var recvF = p.recv(sock, 10) - recvF.callback = - proc (future: PFuture[string]) = - echo("Read ", future.read.len, ": ", future.read.repr) - - else: - - sock.bindAddr(TPort(6667)) - sock.listen() - proc onAccept(future: PFuture[TSocketHandle]) = - let client = future.read - echo "Accepted ", client.cint - var t = p.send(client, "test\c\L") - t.callback = - proc (future: PFuture[int]) = - echo("Send: ", future.read) - client.close() - - var f = p.accept(sock) - f.callback = onAccept - - var f = p.accept(sock) - f.callback = onAccept - - while true: - p.poll() - - - - - - - - diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim new file mode 100644 index 000000000..ed8a2fb82 --- /dev/null +++ b/lib/pure/asyncnet.nim @@ -0,0 +1,147 @@ +import asyncdispatch +import rawsockets +import net + +when defined(ssl): + import openssl + +type + TAsyncSocket = object ## socket type + fd: TSocketHandle + case isBuffered: bool # determines whether this socket is buffered. + of true: + buffer: array[0..BufferSize, char] + currPos: int # current index in buffer + bufLen: int # current length of buffer + of false: nil + when defined(ssl): + case isSsl: bool + of true: + sslHandle: PSSL + sslContext: PSSLContext + sslNoHandshake: bool # True if needs handshake. + sslHasPeekChar: bool + sslPeekChar: char + of false: nil + + PAsyncSocket* = ref TAsyncSocket + +# TODO: Save AF, domain etc info and reuse it in procs which need it like connect. + +proc newSocket(fd: TSocketHandle, isBuff: bool): PAsyncSocket = + assert fd != osInvalidSocket + new(result) + result.fd = fd + result.isBuffered = isBuff + if isBuff: + result.currPos = 0 + +proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP, buffered = true): PAsyncSocket = + ## Creates a new asynchronous socket. + result = newSocket(gDisp.socket(domain, typ, protocol), buffered) + +proc connect*(socket: PAsyncSocket, address: string, port: TPort, + af = AF_INET): PFuture[void] = + ## Connects ``socket`` to server at ``address:port``. + ## + ## Returns a ``PFuture`` which will complete when the connection succeeds + ## or an error occurs. + result = gDisp.connect(socket.fd, address, port, af) + +proc recv*(socket: PAsyncSocket, size: int, + flags: int = 0): PFuture[string] = + ## Reads ``size`` bytes from ``socket``. Returned future will complete once + ## all of the requested data is read. If socket is disconnected during the + ## recv operation then the future may complete with only a part of the + ## requested data read. If socket is disconnected and no data is available + ## to be read then the future will complete with a value of ``""``. + result = gDisp.recv(socket.fd, size, flags) + +proc send*(socket: PAsyncSocket, data: string): PFuture[void] = + ## Sends ``data`` to ``socket``. The returned future will complete once all + ## data has been sent. + result = gDisp.send(socket.fd, data) + +proc acceptAddr*(socket: PAsyncSocket): + PFuture[tuple[address: string, client: PAsyncSocket]] = + ## Accepts a new connection. Returns a future containing the client socket + ## corresponding to that connection and the remote address of the client. + ## The future will complete when the connection is successfully accepted. + var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]() + var fut = gDisp.acceptAddr(socket.fd) + fut.callback = + proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) = + assert future.finished + if future.failed: + retFuture.fail(future.readError) + else: + let resultTup = (future.read.address, + newSocket(future.read.client, socket.isBuffered)) + retFuture.complete(resultTup) + return retFuture + +proc accept*(socket: PAsyncSocket): PFuture[PAsyncSocket] = + ## Accepts a new connection. Returns a future containing the client socket + ## corresponding to that connection. + ## The future will complete when the connection is successfully accepted. + var retFut = newFuture[PAsyncSocket]() + var fut = acceptAddr(socket) + fut.callback = + proc (future: PFuture[tuple[address: string, client: PAsyncSocket]]) = + assert future.finished + if future.failed: + retFut.fail(future.readError) + else: + retFut.complete(future.read.client) + return retFut + +proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} = + ## Reads a line of data from ``socket``. Returned future will complete once + ## a full line is read or an error occurs. + ## + ## If a full line is read ``\r\L`` is not + ## added to ``line``, however if solely ``\r\L`` is read then ``line`` + ## will be set to it. + ## + ## If the socket is disconnected, ``line`` will be set to ``""``. + ## + ## If the socket is disconnected in the middle of a line (before ``\r\L`` + ## is read) then line will be set to ``""``. + ## The partial line **will be lost**. + + template addNLIfEmpty(): stmt = + if result.len == 0: + result.add("\c\L") + + result = "" + var c = "" + while true: + c = await recv(socket, 1) + if c.len == 0: + return "" + if c == "\r": + c = await recv(socket, 1, MSG_PEEK) + if c.len > 0 and c == "\L": + discard await recv(socket, 1) + addNLIfEmpty() + return + elif c == "\L": + addNLIfEmpty() + return + add(result.string, c) + +when isMainModule: + proc main() {.async.} = + var sock = AsyncSocket() + await sock.connect("irc.freenode.net", TPort(6667)) + while true: + let line = await sock.recvLine() + if line == "": + echo("Disconnected") + break + else: + echo("Got line: ", line) + main() + runForever() + diff --git a/lib/pure/net.nim b/lib/pure/net.nim index 45883166b..d40f0949b 100644 --- a/lib/pure/net.nim +++ b/lib/pure/net.nim @@ -10,7 +10,7 @@ ## This module implements a high-level cross-platform sockets interface. {.deadCodeElim: on.} -import sockets2, os, strutils, unsigned, parseutils, times +import rawsockets, os, strutils, unsigned, parseutils, times type IpAddressFamily* {.pure.} = enum ## Describes the type of an IP address @@ -360,7 +360,7 @@ proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, ## Creates a new socket. ## ## If an error occurs EOS will be raised. - let fd = sockets2.socket(domain, typ, protocol) + let fd = rawsockets.socket(domain, typ, protocol) if fd == osInvalidSocket: osError(osLastError()) result = newSocket(fd, buffered) diff --git a/lib/pure/rawsockets.nim b/lib/pure/rawsockets.nim new file mode 100644 index 000000000..db04f6097 --- /dev/null +++ b/lib/pure/rawsockets.nim @@ -0,0 +1,420 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2014 Dominik Picheta +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module implements a low-level cross-platform sockets interface. Look +## at the ``net`` module for the higher-level version. + +# TODO: Clean up the exports a bit and everything else in general. + +import unsigned, os + +when hostos == "solaris": + {.passl: "-lsocket -lnsl".} + +when defined(Windows): + import winlean + export WSAEWOULDBLOCK +else: + import posix + export fcntl, F_GETFL, O_NONBLOCK, F_SETFL, EAGAIN, EWOULDBLOCK + +export TSocketHandle, TSockaddr_in, TAddrinfo, INADDR_ANY, TSockAddr, TSockLen, + inet_ntoa, recv, `==`, connect, send, accept, recvfrom, sendto + +export + SO_ERROR, + SOL_SOCKET, + SOMAXCONN, + SO_ACCEPTCONN, SO_BROADCAST, SO_DEBUG, SO_DONTROUTE, + SO_KEEPALIVE, SO_OOBINLINE, SO_REUSEADDR, + MSG_PEEK + +type + + TPort* = distinct uint16 ## port type + + TDomain* = enum ## domain, which specifies the protocol family of the + ## created socket. Other domains than those that are listed + ## here are unsupported. + AF_UNIX, ## for local socket (using a file). Unsupported on Windows. + AF_INET = 2, ## for network protocol IPv4 or + AF_INET6 = 23 ## for network protocol IPv6. + + TType* = enum ## second argument to `socket` proc + SOCK_STREAM = 1, ## reliable stream-oriented service or Stream Sockets + SOCK_DGRAM = 2, ## datagram service or Datagram Sockets + SOCK_RAW = 3, ## raw protocols atop the network layer. + SOCK_SEQPACKET = 5 ## reliable sequenced packet service + + TProtocol* = enum ## third argument to `socket` proc + IPPROTO_TCP = 6, ## Transmission control protocol. + IPPROTO_UDP = 17, ## User datagram protocol. + IPPROTO_IP, ## Internet protocol. Unsupported on Windows. + IPPROTO_IPV6, ## Internet Protocol Version 6. Unsupported on Windows. + IPPROTO_RAW, ## Raw IP Packets Protocol. Unsupported on Windows. + IPPROTO_ICMP ## Control message protocol. Unsupported on Windows. + + TServent* {.pure, final.} = object ## information about a service + name*: string + aliases*: seq[string] + port*: TPort + proto*: string + + Thostent* {.pure, final.} = object ## information about a given host + name*: string + aliases*: seq[string] + addrtype*: TDomain + length*: int + addrList*: seq[string] + +when defined(windows): + let + osInvalidSocket* = winlean.INVALID_SOCKET + + const + IOCPARM_MASK* = 127 + IOC_IN* = int(-2147483648) + FIONBIO* = IOC_IN.int32 or ((sizeof(int32) and IOCPARM_MASK) shl 16) or + (102 shl 8) or 126 + + proc ioctlsocket*(s: TSocketHandle, cmd: clong, + argptr: ptr clong): cint {. + stdcall, importc: "ioctlsocket", dynlib: "ws2_32.dll".} +else: + let + osInvalidSocket* = posix.INVALID_SOCKET + +proc `==`*(a, b: TPort): bool {.borrow.} + ## ``==`` for ports. + +proc `$`*(p: TPort): string {.borrow.} + ## returns the port number as a string + +proc toInt*(domain: TDomain): cint + ## Converts the TDomain enum to a platform-dependent ``cint``. + +proc toInt*(typ: TType): cint + ## Converts the TType enum to a platform-dependent ``cint``. + +proc toInt*(p: TProtocol): cint + ## Converts the TProtocol enum to a platform-dependent ``cint``. + +when defined(posix): + proc toInt(domain: TDomain): cint = + case domain + of AF_UNIX: result = posix.AF_UNIX + of AF_INET: result = posix.AF_INET + of AF_INET6: result = posix.AF_INET6 + else: discard + + proc toInt(typ: TType): cint = + case typ + of SOCK_STREAM: result = posix.SOCK_STREAM + of SOCK_DGRAM: result = posix.SOCK_DGRAM + of SOCK_SEQPACKET: result = posix.SOCK_SEQPACKET + of SOCK_RAW: result = posix.SOCK_RAW + else: discard + + proc toInt(p: TProtocol): cint = + case p + of IPPROTO_TCP: result = posix.IPPROTO_TCP + of IPPROTO_UDP: result = posix.IPPROTO_UDP + of IPPROTO_IP: result = posix.IPPROTO_IP + of IPPROTO_IPV6: result = posix.IPPROTO_IPV6 + of IPPROTO_RAW: result = posix.IPPROTO_RAW + of IPPROTO_ICMP: result = posix.IPPROTO_ICMP + else: discard + +else: + proc toInt(domain: TDomain): cint = + result = toU16(ord(domain)) + + proc toInt(typ: TType): cint = + result = cint(ord(typ)) + + proc toInt(p: TProtocol): cint = + result = cint(ord(p)) + + +proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + ## Creates a new socket; returns `InvalidSocket` if an error occurs. + socket(toInt(domain), toInt(typ), toInt(protocol)) + +proc close*(socket: TSocketHandle) = + ## closes a socket. + when defined(windows): + discard winlean.closeSocket(socket) + else: + discard posix.close(socket) + # TODO: These values should not be discarded. An EOS should be raised. + # http://stackoverflow.com/questions/12463473/what-happens-if-you-call-close-on-a-bsd-socket-multiple-times + +proc bindAddr*(socket: TSocketHandle, name: ptr TSockAddr, namelen: TSockLen): cint = + result = bindSocket(socket, name, namelen) + +proc listen*(socket: TSocketHandle, backlog = SOMAXCONN): cint {.tags: [FReadIO].} = + ## Marks ``socket`` as accepting connections. + ## ``Backlog`` specifies the maximum length of the + ## queue of pending connections. + when defined(windows): + result = winlean.listen(socket, cint(backlog)) + else: + result = posix.listen(socket, cint(backlog)) + +proc getAddrInfo*(address: string, port: TPort, af: TDomain = AF_INET, typ: TType = SOCK_STREAM, + prot: TProtocol = IPPROTO_TCP): ptr TAddrInfo = + ## + ## + ## **Warning**: The resulting ``ptr TAddrInfo`` must be freed using ``dealloc``! + var hints: TAddrInfo + result = nil + hints.ai_family = toInt(af) + hints.ai_socktype = toInt(typ) + hints.ai_protocol = toInt(prot) + var gaiResult = getAddrInfo(address, $port, addr(hints), result) + if gaiResult != 0'i32: + when defined(windows): + OSError(OSLastError()) + else: + raise newException(EOS, $gai_strerror(gaiResult)) + +proc dealloc*(ai: ptr TAddrInfo) = + freeaddrinfo(ai) + +proc ntohl*(x: int32): int32 = + ## Converts 32-bit integers from network to host byte order. + ## On machines where the host byte order is the same as network byte order, + ## this is a no-op; otherwise, it performs a 4-byte swap operation. + when cpuEndian == bigEndian: result = x + else: result = (x shr 24'i32) or + (x shr 8'i32 and 0xff00'i32) or + (x shl 8'i32 and 0xff0000'i32) or + (x shl 24'i32) + +proc ntohs*(x: int16): int16 = + ## Converts 16-bit integers from network to host byte order. On machines + ## where the host byte order is the same as network byte order, this is + ## a no-op; otherwise, it performs a 2-byte swap operation. + when cpuEndian == bigEndian: result = x + else: result = (x shr 8'i16) or (x shl 8'i16) + +proc htonl*(x: int32): int32 = + ## Converts 32-bit integers from host to network byte order. On machines + ## where the host byte order is the same as network byte order, this is + ## a no-op; otherwise, it performs a 4-byte swap operation. + result = rawsockets.ntohl(x) + +proc htons*(x: int16): int16 = + ## Converts 16-bit positive integers from host to network byte order. + ## On machines where the host byte order is the same as network byte + ## order, this is a no-op; otherwise, it performs a 2-byte swap operation. + result = rawsockets.ntohs(x) + +proc getServByName*(name, proto: string): TServent {.tags: [FReadIO].} = + ## Searches the database from the beginning and finds the first entry for + ## which the service name specified by ``name`` matches the s_name member + ## and the protocol name specified by ``proto`` matches the s_proto member. + ## + ## On posix this will search through the ``/etc/services`` file. + when defined(Windows): + var s = winlean.getservbyname(name, proto) + else: + var s = posix.getservbyname(name, proto) + if s == nil: raise newException(EOS, "Service not found.") + result.name = $s.s_name + result.aliases = cstringArrayToSeq(s.s_aliases) + result.port = TPort(s.s_port) + result.proto = $s.s_proto + +proc getServByPort*(port: TPort, proto: string): TServent {.tags: [FReadIO].} = + ## Searches the database from the beginning and finds the first entry for + ## which the port specified by ``port`` matches the s_port member and the + ## protocol name specified by ``proto`` matches the s_proto member. + ## + ## On posix this will search through the ``/etc/services`` file. + when defined(Windows): + var s = winlean.getservbyport(ze(int16(port)).cint, proto) + else: + var s = posix.getservbyport(ze(int16(port)).cint, proto) + if s == nil: raise newException(EOS, "Service not found.") + result.name = $s.s_name + result.aliases = cstringArrayToSeq(s.s_aliases) + result.port = TPort(s.s_port) + result.proto = $s.s_proto + +proc getHostByAddr*(ip: string): Thostent {.tags: [FReadIO].} = + ## This function will lookup the hostname of an IP Address. + var myaddr: TInAddr + myaddr.s_addr = inet_addr(ip) + + when defined(windows): + var s = winlean.gethostbyaddr(addr(myaddr), sizeof(myaddr).cuint, + cint(rawsockets.AF_INET)) + if s == nil: osError(osLastError()) + else: + var s = posix.gethostbyaddr(addr(myaddr), sizeof(myaddr).TSocklen, + cint(posix.AF_INET)) + if s == nil: + raise newException(EOS, $hstrerror(h_errno)) + + result.name = $s.h_name + result.aliases = cstringArrayToSeq(s.h_aliases) + when defined(windows): + result.addrtype = TDomain(s.h_addrtype) + else: + if s.h_addrtype == posix.AF_INET: + result.addrtype = AF_INET + elif s.h_addrtype == posix.AF_INET6: + result.addrtype = AF_INET6 + else: + raise newException(EOS, "unknown h_addrtype") + result.addrList = cstringArrayToSeq(s.h_addr_list) + result.length = int(s.h_length) + +proc getHostByName*(name: string): Thostent {.tags: [FReadIO].} = + ## This function will lookup the IP address of a hostname. + when defined(Windows): + var s = winlean.gethostbyname(name) + else: + var s = posix.gethostbyname(name) + if s == nil: osError(osLastError()) + result.name = $s.h_name + result.aliases = cstringArrayToSeq(s.h_aliases) + when defined(windows): + result.addrtype = TDomain(s.h_addrtype) + else: + if s.h_addrtype == posix.AF_INET: + result.addrtype = AF_INET + elif s.h_addrtype == posix.AF_INET6: + result.addrtype = AF_INET6 + else: + raise newException(EOS, "unknown h_addrtype") + result.addrList = cstringArrayToSeq(s.h_addr_list) + result.length = int(s.h_length) + +proc getSockName*(socket: TSocketHandle): TPort = + ## returns the socket's associated port number. + var name: Tsockaddr_in + when defined(Windows): + name.sin_family = int16(ord(AF_INET)) + else: + name.sin_family = posix.AF_INET + #name.sin_port = htons(cint16(port)) + #name.sin_addr.s_addr = htonl(INADDR_ANY) + var namelen = sizeof(name).TSocklen + if getsockname(socket, cast[ptr TSockAddr](addr(name)), + addr(namelen)) == -1'i32: + osError(osLastError()) + result = TPort(rawsockets.ntohs(name.sin_port)) + +proc getSockOptInt*(socket: TSocketHandle, level, optname: int): int {. + tags: [FReadIO].} = + ## getsockopt for integer options. + var res: cint + var size = sizeof(res).TSocklen + if getsockopt(socket, cint(level), cint(optname), + addr(res), addr(size)) < 0'i32: + osError(osLastError()) + result = int(res) + +proc setSockOptInt*(socket: TSocketHandle, level, optname, optval: int) {. + tags: [FWriteIO].} = + ## setsockopt for integer options. + var value = cint(optval) + if setsockopt(socket, cint(level), cint(optname), addr(value), + sizeof(value).TSocklen) < 0'i32: + osError(osLastError()) + +proc setBlocking*(s: TSocketHandle, blocking: bool) = + ## Sets blocking mode on socket. + ## + ## Raises EOS on error. + when defined(Windows): + var mode = clong(ord(not blocking)) # 1 for non-blocking, 0 for blocking + if ioctlsocket(s, FIONBIO, addr(mode)) == -1: + osError(osLastError()) + else: # BSD sockets + var x: int = fcntl(s, F_GETFL, 0) + if x == -1: + osError(osLastError()) + else: + var mode = if blocking: x and not O_NONBLOCK else: x or O_NONBLOCK + if fcntl(s, F_SETFL, mode) == -1: + osError(osLastError()) + +proc timeValFromMilliseconds(timeout = 500): Ttimeval = + if timeout != -1: + var seconds = timeout div 1000 + result.tv_sec = seconds.int32 + result.tv_usec = ((timeout - seconds * 1000) * 1000).int32 + +proc createFdSet(fd: var TFdSet, s: seq[TSocketHandle], m: var int) = + FD_ZERO(fd) + for i in items(s): + m = max(m, int(i)) + FD_SET(i, fd) + +proc pruneSocketSet(s: var seq[TSocketHandle], fd: var TFdSet) = + var i = 0 + var L = s.len + while i < L: + if FD_ISSET(s[i], fd) == 0'i32: + s[i] = s[L-1] + dec(L) + else: + inc(i) + setLen(s, L) + +proc select*(readfds: var seq[TSocketHandle], timeout = 500): int = + ## Traditional select function. This function will return the number of + ## sockets that are ready to be read from, written to, or which have errors. + ## If there are none; 0 is returned. + ## ``Timeout`` is in miliseconds and -1 can be specified for no timeout. + ## + ## A socket is removed from the specific ``seq`` when it has data waiting to + ## be read/written to or has errors (``exceptfds``). + var tv {.noInit.}: Ttimeval = timeValFromMilliseconds(timeout) + + var rd: TFdSet + var m = 0 + createFdSet((rd), readfds, m) + + if timeout != -1: + result = int(select(cint(m+1), addr(rd), nil, nil, addr(tv))) + else: + result = int(select(cint(m+1), addr(rd), nil, nil, nil)) + + pruneSocketSet(readfds, (rd)) + +proc selectWrite*(writefds: var seq[TSocketHandle], + timeout = 500): int {.tags: [FReadIO].} = + ## When a socket in ``writefds`` is ready to be written to then a non-zero + ## value will be returned specifying the count of the sockets which can be + ## written to. The sockets which can be written to will also be removed + ## from ``writefds``. + ## + ## ``timeout`` is specified in miliseconds and ``-1`` can be specified for + ## an unlimited time. + var tv {.noInit.}: Ttimeval = timeValFromMilliseconds(timeout) + + var wr: TFdSet + var m = 0 + createFdSet((wr), writefds, m) + + if timeout != -1: + result = int(select(cint(m+1), nil, addr(wr), nil, addr(tv))) + else: + result = int(select(cint(m+1), nil, addr(wr), nil, nil)) + + pruneSocketSet(writefds, (wr)) + +when defined(Windows): + var wsa: TWSADATA + if WSAStartup(0x0101'i16, addr wsa) != 0: OSError(OSLastError()) diff --git a/lib/pure/sockets2.nim b/lib/pure/sockets2.nim deleted file mode 100644 index 975cc685a..000000000 --- a/lib/pure/sockets2.nim +++ /dev/null @@ -1,420 +0,0 @@ -# -# -# Nimrod's Runtime Library -# (c) Copyright 2014 Dominik Picheta -# -# See the file "copying.txt", included in this -# distribution, for details about the copyright. -# - -## This module implements a low-level cross-platform sockets interface. Look -## at the ``net`` module for the higher-level version. - -# TODO: Clean up the exports a bit and everything else in general. - -import unsigned, os - -when hostos == "solaris": - {.passl: "-lsocket -lnsl".} - -when defined(Windows): - import winlean - export WSAEWOULDBLOCK -else: - import posix - export fcntl, F_GETFL, O_NONBLOCK, F_SETFL, EAGAIN, EWOULDBLOCK - -export TSocketHandle, TSockaddr_in, TAddrinfo, INADDR_ANY, TSockAddr, TSockLen, - inet_ntoa, recv, `==`, connect, send, accept, recvfrom, sendto - -export - SO_ERROR, - SOL_SOCKET, - SOMAXCONN, - SO_ACCEPTCONN, SO_BROADCAST, SO_DEBUG, SO_DONTROUTE, - SO_KEEPALIVE, SO_OOBINLINE, SO_REUSEADDR, - MSG_PEEK - -type - - TPort* = distinct uint16 ## port type - - TDomain* = enum ## domain, which specifies the protocol family of the - ## created socket. Other domains than those that are listed - ## here are unsupported. - AF_UNIX, ## for local socket (using a file). Unsupported on Windows. - AF_INET = 2, ## for network protocol IPv4 or - AF_INET6 = 23 ## for network protocol IPv6. - - TType* = enum ## second argument to `socket` proc - SOCK_STREAM = 1, ## reliable stream-oriented service or Stream Sockets - SOCK_DGRAM = 2, ## datagram service or Datagram Sockets - SOCK_RAW = 3, ## raw protocols atop the network layer. - SOCK_SEQPACKET = 5 ## reliable sequenced packet service - - TProtocol* = enum ## third argument to `socket` proc - IPPROTO_TCP = 6, ## Transmission control protocol. - IPPROTO_UDP = 17, ## User datagram protocol. - IPPROTO_IP, ## Internet protocol. Unsupported on Windows. - IPPROTO_IPV6, ## Internet Protocol Version 6. Unsupported on Windows. - IPPROTO_RAW, ## Raw IP Packets Protocol. Unsupported on Windows. - IPPROTO_ICMP ## Control message protocol. Unsupported on Windows. - - TServent* {.pure, final.} = object ## information about a service - name*: string - aliases*: seq[string] - port*: TPort - proto*: string - - Thostent* {.pure, final.} = object ## information about a given host - name*: string - aliases*: seq[string] - addrtype*: TDomain - length*: int - addrList*: seq[string] - -when defined(windows): - let - osInvalidSocket* = winlean.INVALID_SOCKET - - const - IOCPARM_MASK* = 127 - IOC_IN* = int(-2147483648) - FIONBIO* = IOC_IN.int32 or ((sizeof(int32) and IOCPARM_MASK) shl 16) or - (102 shl 8) or 126 - - proc ioctlsocket*(s: TSocketHandle, cmd: clong, - argptr: ptr clong): cint {. - stdcall, importc: "ioctlsocket", dynlib: "ws2_32.dll".} -else: - let - osInvalidSocket* = posix.INVALID_SOCKET - -proc `==`*(a, b: TPort): bool {.borrow.} - ## ``==`` for ports. - -proc `$`*(p: TPort): string {.borrow.} - ## returns the port number as a string - -proc toInt*(domain: TDomain): cint - ## Converts the TDomain enum to a platform-dependent ``cint``. - -proc toInt*(typ: TType): cint - ## Converts the TType enum to a platform-dependent ``cint``. - -proc toInt*(p: TProtocol): cint - ## Converts the TProtocol enum to a platform-dependent ``cint``. - -when defined(posix): - proc toInt(domain: TDomain): cint = - case domain - of AF_UNIX: result = posix.AF_UNIX - of AF_INET: result = posix.AF_INET - of AF_INET6: result = posix.AF_INET6 - else: discard - - proc toInt(typ: TType): cint = - case typ - of SOCK_STREAM: result = posix.SOCK_STREAM - of SOCK_DGRAM: result = posix.SOCK_DGRAM - of SOCK_SEQPACKET: result = posix.SOCK_SEQPACKET - of SOCK_RAW: result = posix.SOCK_RAW - else: discard - - proc toInt(p: TProtocol): cint = - case p - of IPPROTO_TCP: result = posix.IPPROTO_TCP - of IPPROTO_UDP: result = posix.IPPROTO_UDP - of IPPROTO_IP: result = posix.IPPROTO_IP - of IPPROTO_IPV6: result = posix.IPPROTO_IPV6 - of IPPROTO_RAW: result = posix.IPPROTO_RAW - of IPPROTO_ICMP: result = posix.IPPROTO_ICMP - else: discard - -else: - proc toInt(domain: TDomain): cint = - result = toU16(ord(domain)) - - proc toInt(typ: TType): cint = - result = cint(ord(typ)) - - proc toInt(p: TProtocol): cint = - result = cint(ord(p)) - - -proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, - protocol: TProtocol = IPPROTO_TCP): TSocketHandle = - ## Creates a new socket; returns `InvalidSocket` if an error occurs. - socket(toInt(domain), toInt(typ), toInt(protocol)) - -proc close*(socket: TSocketHandle) = - ## closes a socket. - when defined(windows): - discard winlean.closeSocket(socket) - else: - discard posix.close(socket) - # TODO: These values should not be discarded. An EOS should be raised. - # http://stackoverflow.com/questions/12463473/what-happens-if-you-call-close-on-a-bsd-socket-multiple-times - -proc bindAddr*(socket: TSocketHandle, name: ptr TSockAddr, namelen: TSockLen): cint = - result = bindSocket(socket, name, namelen) - -proc listen*(socket: TSocketHandle, backlog = SOMAXCONN): cint {.tags: [FReadIO].} = - ## Marks ``socket`` as accepting connections. - ## ``Backlog`` specifies the maximum length of the - ## queue of pending connections. - when defined(windows): - result = winlean.listen(socket, cint(backlog)) - else: - result = posix.listen(socket, cint(backlog)) - -proc getAddrInfo*(address: string, port: TPort, af: TDomain = AF_INET, typ: TType = SOCK_STREAM, - prot: TProtocol = IPPROTO_TCP): ptr TAddrInfo = - ## - ## - ## **Warning**: The resulting ``ptr TAddrInfo`` must be freed using ``dealloc``! - var hints: TAddrInfo - result = nil - hints.ai_family = toInt(af) - hints.ai_socktype = toInt(typ) - hints.ai_protocol = toInt(prot) - var gaiResult = getAddrInfo(address, $port, addr(hints), result) - if gaiResult != 0'i32: - when defined(windows): - OSError(OSLastError()) - else: - raise newException(EOS, $gai_strerror(gaiResult)) - -proc dealloc*(ai: ptr TAddrInfo) = - freeaddrinfo(ai) - -proc ntohl*(x: int32): int32 = - ## Converts 32-bit integers from network to host byte order. - ## On machines where the host byte order is the same as network byte order, - ## this is a no-op; otherwise, it performs a 4-byte swap operation. - when cpuEndian == bigEndian: result = x - else: result = (x shr 24'i32) or - (x shr 8'i32 and 0xff00'i32) or - (x shl 8'i32 and 0xff0000'i32) or - (x shl 24'i32) - -proc ntohs*(x: int16): int16 = - ## Converts 16-bit integers from network to host byte order. On machines - ## where the host byte order is the same as network byte order, this is - ## a no-op; otherwise, it performs a 2-byte swap operation. - when cpuEndian == bigEndian: result = x - else: result = (x shr 8'i16) or (x shl 8'i16) - -proc htonl*(x: int32): int32 = - ## Converts 32-bit integers from host to network byte order. On machines - ## where the host byte order is the same as network byte order, this is - ## a no-op; otherwise, it performs a 4-byte swap operation. - result = sockets2.ntohl(x) - -proc htons*(x: int16): int16 = - ## Converts 16-bit positive integers from host to network byte order. - ## On machines where the host byte order is the same as network byte - ## order, this is a no-op; otherwise, it performs a 2-byte swap operation. - result = sockets2.ntohs(x) - -proc getServByName*(name, proto: string): TServent {.tags: [FReadIO].} = - ## Searches the database from the beginning and finds the first entry for - ## which the service name specified by ``name`` matches the s_name member - ## and the protocol name specified by ``proto`` matches the s_proto member. - ## - ## On posix this will search through the ``/etc/services`` file. - when defined(Windows): - var s = winlean.getservbyname(name, proto) - else: - var s = posix.getservbyname(name, proto) - if s == nil: raise newException(EOS, "Service not found.") - result.name = $s.s_name - result.aliases = cstringArrayToSeq(s.s_aliases) - result.port = TPort(s.s_port) - result.proto = $s.s_proto - -proc getServByPort*(port: TPort, proto: string): TServent {.tags: [FReadIO].} = - ## Searches the database from the beginning and finds the first entry for - ## which the port specified by ``port`` matches the s_port member and the - ## protocol name specified by ``proto`` matches the s_proto member. - ## - ## On posix this will search through the ``/etc/services`` file. - when defined(Windows): - var s = winlean.getservbyport(ze(int16(port)).cint, proto) - else: - var s = posix.getservbyport(ze(int16(port)).cint, proto) - if s == nil: raise newException(EOS, "Service not found.") - result.name = $s.s_name - result.aliases = cstringArrayToSeq(s.s_aliases) - result.port = TPort(s.s_port) - result.proto = $s.s_proto - -proc getHostByAddr*(ip: string): Thostent {.tags: [FReadIO].} = - ## This function will lookup the hostname of an IP Address. - var myaddr: TInAddr - myaddr.s_addr = inet_addr(ip) - - when defined(windows): - var s = winlean.gethostbyaddr(addr(myaddr), sizeof(myaddr).cuint, - cint(sockets2.AF_INET)) - if s == nil: osError(osLastError()) - else: - var s = posix.gethostbyaddr(addr(myaddr), sizeof(myaddr).TSocklen, - cint(posix.AF_INET)) - if s == nil: - raise newException(EOS, $hstrerror(h_errno)) - - result.name = $s.h_name - result.aliases = cstringArrayToSeq(s.h_aliases) - when defined(windows): - result.addrtype = TDomain(s.h_addrtype) - else: - if s.h_addrtype == posix.AF_INET: - result.addrtype = AF_INET - elif s.h_addrtype == posix.AF_INET6: - result.addrtype = AF_INET6 - else: - raise newException(EOS, "unknown h_addrtype") - result.addrList = cstringArrayToSeq(s.h_addr_list) - result.length = int(s.h_length) - -proc getHostByName*(name: string): Thostent {.tags: [FReadIO].} = - ## This function will lookup the IP address of a hostname. - when defined(Windows): - var s = winlean.gethostbyname(name) - else: - var s = posix.gethostbyname(name) - if s == nil: osError(osLastError()) - result.name = $s.h_name - result.aliases = cstringArrayToSeq(s.h_aliases) - when defined(windows): - result.addrtype = TDomain(s.h_addrtype) - else: - if s.h_addrtype == posix.AF_INET: - result.addrtype = AF_INET - elif s.h_addrtype == posix.AF_INET6: - result.addrtype = AF_INET6 - else: - raise newException(EOS, "unknown h_addrtype") - result.addrList = cstringArrayToSeq(s.h_addr_list) - result.length = int(s.h_length) - -proc getSockName*(socket: TSocketHandle): TPort = - ## returns the socket's associated port number. - var name: Tsockaddr_in - when defined(Windows): - name.sin_family = int16(ord(AF_INET)) - else: - name.sin_family = posix.AF_INET - #name.sin_port = htons(cint16(port)) - #name.sin_addr.s_addr = htonl(INADDR_ANY) - var namelen = sizeof(name).TSocklen - if getsockname(socket, cast[ptr TSockAddr](addr(name)), - addr(namelen)) == -1'i32: - osError(osLastError()) - result = TPort(sockets2.ntohs(name.sin_port)) - -proc getSockOptInt*(socket: TSocketHandle, level, optname: int): int {. - tags: [FReadIO].} = - ## getsockopt for integer options. - var res: cint - var size = sizeof(res).TSocklen - if getsockopt(socket, cint(level), cint(optname), - addr(res), addr(size)) < 0'i32: - osError(osLastError()) - result = int(res) - -proc setSockOptInt*(socket: TSocketHandle, level, optname, optval: int) {. - tags: [FWriteIO].} = - ## setsockopt for integer options. - var value = cint(optval) - if setsockopt(socket, cint(level), cint(optname), addr(value), - sizeof(value).TSocklen) < 0'i32: - osError(osLastError()) - -proc setBlocking*(s: TSocketHandle, blocking: bool) = - ## Sets blocking mode on socket. - ## - ## Raises EOS on error. - when defined(Windows): - var mode = clong(ord(not blocking)) # 1 for non-blocking, 0 for blocking - if ioctlsocket(s, FIONBIO, addr(mode)) == -1: - osError(osLastError()) - else: # BSD sockets - var x: int = fcntl(s, F_GETFL, 0) - if x == -1: - osError(osLastError()) - else: - var mode = if blocking: x and not O_NONBLOCK else: x or O_NONBLOCK - if fcntl(s, F_SETFL, mode) == -1: - osError(osLastError()) - -proc timeValFromMilliseconds(timeout = 500): Ttimeval = - if timeout != -1: - var seconds = timeout div 1000 - result.tv_sec = seconds.int32 - result.tv_usec = ((timeout - seconds * 1000) * 1000).int32 - -proc createFdSet(fd: var TFdSet, s: seq[TSocketHandle], m: var int) = - FD_ZERO(fd) - for i in items(s): - m = max(m, int(i)) - FD_SET(i, fd) - -proc pruneSocketSet(s: var seq[TSocketHandle], fd: var TFdSet) = - var i = 0 - var L = s.len - while i < L: - if FD_ISSET(s[i], fd) == 0'i32: - s[i] = s[L-1] - dec(L) - else: - inc(i) - setLen(s, L) - -proc select*(readfds: var seq[TSocketHandle], timeout = 500): int = - ## Traditional select function. This function will return the number of - ## sockets that are ready to be read from, written to, or which have errors. - ## If there are none; 0 is returned. - ## ``Timeout`` is in miliseconds and -1 can be specified for no timeout. - ## - ## A socket is removed from the specific ``seq`` when it has data waiting to - ## be read/written to or has errors (``exceptfds``). - var tv {.noInit.}: Ttimeval = timeValFromMilliseconds(timeout) - - var rd: TFdSet - var m = 0 - createFdSet((rd), readfds, m) - - if timeout != -1: - result = int(select(cint(m+1), addr(rd), nil, nil, addr(tv))) - else: - result = int(select(cint(m+1), addr(rd), nil, nil, nil)) - - pruneSocketSet(readfds, (rd)) - -proc selectWrite*(writefds: var seq[TSocketHandle], - timeout = 500): int {.tags: [FReadIO].} = - ## When a socket in ``writefds`` is ready to be written to then a non-zero - ## value will be returned specifying the count of the sockets which can be - ## written to. The sockets which can be written to will also be removed - ## from ``writefds``. - ## - ## ``timeout`` is specified in miliseconds and ``-1`` can be specified for - ## an unlimited time. - var tv {.noInit.}: Ttimeval = timeValFromMilliseconds(timeout) - - var wr: TFdSet - var m = 0 - createFdSet((wr), writefds, m) - - if timeout != -1: - result = int(select(cint(m+1), nil, addr(wr), nil, addr(tv))) - else: - result = int(select(cint(m+1), nil, addr(wr), nil, nil)) - - pruneSocketSet(writefds, (wr)) - -when defined(Windows): - var wsa: TWSADATA - if WSAStartup(0x0101'i16, addr wsa) != 0: OSError(OSLastError()) -- cgit 1.4.1-2-gfad0