diff options
author | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-03-23 18:24:11 +0000 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-03-23 18:24:11 +0000 |
commit | d310b01db1c6f2c8e63a561c40352b17978e1bdb (patch) | |
tree | 1ebb9cce9a04d51dfb11629b2248d364651d81de | |
parent | e855f6c0735d4dec8b34084e439c6c215f12b155 (diff) | |
download | Nim-d310b01db1c6f2c8e63a561c40352b17978e1bdb.tar.gz |
Moved the global dispatcher to asyncdispatch.
-rw-r--r-- | lib/pure/asyncdispatch.nim | 235 | ||||
-rw-r--r-- | lib/pure/asyncnet.nim | 30 | ||||
-rw-r--r-- | lib/pure/net.nim | 8 | ||||
-rw-r--r-- | lib/pure/rawsockets.nim | 2 | ||||
-rw-r--r-- | tests/async/tasyncawait.nim | 45 |
5 files changed, 175 insertions, 145 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 67361e46c..4a4ef8bdd 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -111,13 +111,13 @@ when defined(windows) or defined(nimdoc): TCompletionKey = dword TCompletionData* = object - sock: TSocketHandle - cb: proc (sock: TSocketHandle, bytesTransferred: DWORD, + sock: TAsyncFD + cb: proc (sock: TAsyncFD, bytesTransferred: DWORD, errcode: TOSErrorCode) {.closure.} PDispatcher* = ref object ioPort: THandle - handles: TSet[TSocketHandle] + handles: TSet[TAsyncFD] TCustomOverlapped = object Internal*: DWORD @@ -129,30 +129,42 @@ when defined(windows) or defined(nimdoc): PCustomOverlapped = ptr TCustomOverlapped - proc hash(x: TSocketHandle): THash {.borrow.} + TAsyncFD* = distinct int + + proc hash(x: TAsyncFD): THash {.borrow.} + proc `==`*(x: TAsyncFD, y: TAsyncFD): bool {.borrow.} proc newDispatcher*(): PDispatcher = ## Creates a new Dispatcher instance. new result result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) - result.handles = initSet[TSocketHandle]() + result.handles = initSet[TAsyncFD]() + + var gDisp{.threadvar.}: PDispatcher ## Global dispatcher + proc getGlobalDispatcher*(): PDispatcher = + ## Retrieves the global thread-local dispatcher. + if gDisp.isNil: gDisp = newDispatcher() + result = gDisp - proc register*(p: PDispatcher, sock: TSocketHandle) = - ## Registers ``sock`` with the dispatcher ``p``. + proc register*(sock: TAsyncFD) = + ## Registers ``sock`` with the dispatcher. + let p = getGlobalDispatcher() if CreateIOCompletionPort(sock.THandle, p.ioPort, cast[TCompletionKey](sock), 1) == 0: OSError(OSLastError()) p.handles.incl(sock) - proc verifyPresence(p: PDispatcher, sock: TSocketHandle) = + proc verifyPresence(sock: TAsyncFD) = ## Ensures that socket has been registered with the dispatcher. + let p = getGlobalDispatcher() if sock notin p.handles: raise newException(EInvalidValue, "Operation performed on a socket which has not been registered with" & " the dispatcher yet.") - proc poll*(p: PDispatcher, timeout = 500) = + proc poll*(timeout = 500) = ## Waits for completion events and processes them. + let p = getGlobalDispatcher() if p.handles.len == 0: raise newException(EInvalidValue, "No handles registered in dispatcher.") @@ -170,7 +182,7 @@ when defined(windows) or defined(nimdoc): var customOverlapped = cast[PCustomOverlapped](lpOverlapped) if res: # This is useful for ensuring the reliability of the overlapped struct. - assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle + assert customOverlapped.data.sock == lpCompletionKey.TAsyncFD customOverlapped.data.cb(customOverlapped.data.sock, lpNumberOfBytesTransferred, TOSErrorCode(-1)) @@ -178,7 +190,7 @@ when defined(windows) or defined(nimdoc): else: let errCode = OSLastError() if lpOverlapped != nil: - assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle + assert customOverlapped.data.sock == lpCompletionKey.TAsyncFD customOverlapped.data.cb(customOverlapped.data.sock, lpNumberOfBytesTransferred, errCode) dealloc(customOverlapped) @@ -201,7 +213,7 @@ when defined(windows) or defined(nimdoc): addr bytesRet, nil, nil) == 0 proc initAll() = - let dummySock = socket() + let dummySock = newRawSocket() if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX): OSError(OSLastError()) if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX): @@ -253,20 +265,20 @@ when defined(windows) or defined(nimdoc): dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, RemoteSockaddr, RemoteSockaddrLength) - proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, + proc connect*(socket: TAsyncFD, address: string, port: TPort, af = AF_INET): PFuture[void] = ## Connects ``socket`` to server at ``address:port``. ## ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. - verifyPresence(p, socket) + verifyPresence(socket) var retFuture = newFuture[void]() # Apparently ``ConnectEx`` expects the socket to be initially bound: var saddr: Tsockaddr_in saddr.sin_family = int16(toInt(af)) saddr.sin_port = 0 saddr.sin_addr.s_addr = INADDR_ANY - if bindAddr(socket, cast[ptr TSockAddr](addr(saddr)), + if bindAddr(socket.TSocketHandle, cast[ptr TSockAddr](addr(saddr)), sizeof(saddr).TSockLen) < 0'i32: OSError(OSLastError()) @@ -279,7 +291,7 @@ when defined(windows) or defined(nimdoc): # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = + proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): retFuture.complete() @@ -287,8 +299,9 @@ when defined(windows) or defined(nimdoc): retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) - var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint, - nil, 0, nil, cast[POverlapped](ol)) + var ret = connectEx(socket.TSocketHandle, it.ai_addr, + sizeof(TSockAddrIn).cint, nil, 0, nil, + cast[POverlapped](ol)) if ret: # Request to connect completed immediately. success = true @@ -313,14 +326,14 @@ when defined(windows) or defined(nimdoc): retFuture.fail(newException(EOS, osErrorMsg(lastError))) return retFuture - proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, + proc recv*(socket: TAsyncFD, size: int, flags: int = 0): PFuture[string] = ## Reads ``size`` bytes from ``socket``. Returned future will complete once ## all of the requested data is read. If socket is disconnected during the ## recv operation then the future may complete with only a part of the ## requested data read. If socket is disconnected and no data is available ## to be read then the future will complete with a value of ``""``. - verifyPresence(p, socket) + verifyPresence(socket) var retFuture = newFuture[string]() var dataBuf: TWSABuf @@ -331,7 +344,7 @@ when defined(windows) or defined(nimdoc): var flagsio = flags.dword var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = + proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): if bytesCount == 0 and dataBuf.buf[0] == '\0': @@ -344,7 +357,7 @@ when defined(windows) or defined(nimdoc): retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) - let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived, + let ret = WSARecv(socket.TSocketHandle, addr dataBuf, 1, addr bytesReceived, addr flagsio, cast[POverlapped](ol), nil) if ret == -1: let err = OSLastError() @@ -373,10 +386,10 @@ when defined(windows) or defined(nimdoc): # free ``ol``. return retFuture - proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] = + proc send*(socket: TAsyncFD, data: string): PFuture[void] = ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. - verifyPresence(p, socket) + verifyPresence(socket) var retFuture = newFuture[void]() var dataBuf: TWSABuf @@ -386,7 +399,7 @@ when defined(windows) or defined(nimdoc): var bytesReceived, flags: DWord var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = + proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): retFuture.complete() @@ -394,7 +407,7 @@ when defined(windows) or defined(nimdoc): retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) - let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived, + let ret = WSASend(socket.TSocketHandle, addr dataBuf, 1, addr bytesReceived, flags, cast[POverlapped](ol), nil) if ret == -1: let err = osLastError() @@ -408,17 +421,17 @@ when defined(windows) or defined(nimdoc): # free ``ol``. return retFuture - proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): - PFuture[tuple[address: string, client: TSocketHandle]] = + proc acceptAddr*(socket: TAsyncFD): + PFuture[tuple[address: string, client: TAsyncFD]] = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. ## ## The resulting client socket is automatically registered to dispatcher. - verifyPresence(p, socket) - var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() + verifyPresence(socket) + var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]() - var clientSock = socket() + var clientSock = newRawSocket() if clientSock == OSInvalidSocket: osError(osLastError()) const lpOutputLen = 1024 @@ -441,16 +454,16 @@ when defined(windows) or defined(nimdoc): dwLocalAddressLength, dwRemoteAddressLength, addr LocalSockaddr, addr localLen, addr RemoteSockaddr, addr remoteLen) - p.register(clientSock) + register(clientSock.TAsyncFD) # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 retFuture.complete( (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr), - client: clientSock) + client: clientSock.TAsyncFD) ) var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = + proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): completeAccept() @@ -459,7 +472,7 @@ when defined(windows) or defined(nimdoc): ) # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx - let ret = acceptEx(socket, clientSock, addr lpOutputBuf[0], + let ret = acceptEx(socket.TSocketHandle, clientSock, addr lpOutputBuf[0], dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, @@ -478,73 +491,87 @@ when defined(windows) or defined(nimdoc): return retFuture - proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + proc newAsyncRawSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, - protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + protocol: TProtocol = IPPROTO_TCP): TAsyncFD = ## Creates a new socket and registers it with the dispatcher implicitly. - result = socket(domain, typ, protocol) - result.setBlocking(false) - disp.register(result) + result = newRawSocket(domain, typ, protocol).TAsyncFD + result.TSocketHandle.setBlocking(false) + register(result) - proc close*(disp: PDispatcher, socket: TSocketHandle) = + proc close*(socket: TAsyncFD) = ## Closes a socket and ensures that it is unregistered. - socket.close() - disp.handles.excl(socket) + socket.TSocketHandle.close() + getGlobalDispatcher().handles.excl(socket) initAll() else: import selectors from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK type - TCallback = proc (sock: TSocketHandle): bool {.closure.} + TAsyncFD* = distinct cint + TCallback = proc (sock: TAsyncFD): bool {.closure.} PData* = ref object of PObject - sock: TSocketHandle + sock: TAsyncFD readCBs: seq[TCallback] writeCBs: seq[TCallback] PDispatcher* = ref object selector: PSelector + proc `==`*(x, y: TAsyncFD): bool {.borrow.} + proc newDispatcher*(): PDispatcher = new result result.selector = newSelector() - proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = - assert sock in p.selector - discard p.selector.update(sock, events) + var gDisp{.threadvar.}: PDispatcher ## Global dispatcher + proc getGlobalDispatcher*(): PDispatcher = + if gDisp.isNil: gDisp = newDispatcher() + result = gDisp + + proc update(sock: TAsyncFD, events: set[TEvent]) = + let p = getGlobalDispatcher() + assert sock.TSocketHandle in p.selector + discard p.selector.update(sock.TSocketHandle, events) - proc register(p: PDispatcher, sock: TSocketHandle) = + proc register(sock: TAsyncFD) = + let p = getGlobalDispatcher() var data = PData(sock: sock, readCBs: @[], writeCBs: @[]) - p.selector.register(sock, {}, data.PObject) + p.selector.register(sock.TSocketHandle, {}, data.PObject) - proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + proc newAsyncRawSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, - protocol: TProtocol = IPPROTO_TCP): TSocketHandle = - result = socket(domain, typ, protocol) - result.setBlocking(false) - disp.register(result) + protocol: TProtocol = IPPROTO_TCP): TAsyncFD = + result = newRawSocket(domain, typ, protocol).TAsyncFD + result.TSocketHandle.setBlocking(false) + register(result) - proc close*(disp: PDispatcher, sock: TSocketHandle) = - sock.close() - disp.selector.unregister(sock) - - proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = - if sock notin p.selector: + proc close*(sock: TAsyncFD) = + let disp = getGlobalDispatcher() + sock.TSocketHandle.close() + disp.selector.unregister(sock.TSocketHandle) + + proc addRead(sock: TAsyncFD, cb: TCallback) = + let p = getGlobalDispatcher() + if sock.TSocketHandle notin p.selector: raise newException(EInvalidValue, "File descriptor not registered.") - p.selector[sock].data.PData.readCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvRead}) + p.selector[sock.TSocketHandle].data.PData.readCBs.add(cb) + update(sock, p.selector[sock.TSocketHandle].events + {EvRead}) - proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = - if sock notin p.selector: + proc addWrite(sock: TAsyncFD, cb: TCallback) = + let p = getGlobalDispatcher() + if sock.TSocketHandle notin p.selector: raise newException(EInvalidValue, "File descriptor not registered.") - p.selector[sock].data.PData.writeCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvWrite}) + p.selector[sock.TSocketHandle].data.PData.writeCBs.add(cb) + update(sock, p.selector[sock.TSocketHandle].events + {EvWrite}) - proc poll*(p: PDispatcher, timeout = 500) = + proc poll*(timeout = 500) = + let p = getGlobalDispatcher() for info in p.selector.select(timeout): let data = PData(info.key.data) - assert data.sock == info.key.fd + assert data.sock == info.key.fd.TAsyncFD #echo("In poll ", data.sock.cint) if EvRead in info.events: # Callback may add items to ``data.readCBs`` which causes issues if @@ -570,17 +597,16 @@ else: if data.readCBs.len != 0: newEvents = {EvRead} if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} if newEvents != info.key.events: - echo(info.key.events, " -> ", newEvents) - p.update(data.sock, newEvents) + update(data.sock, newEvents) else: # FD no longer a part of the selector. Likely been closed # (e.g. socket disconnected). - proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, + proc connect*(socket: TAsyncFD, address: string, port: TPort, af = AF_INET): PFuture[void] = var retFuture = newFuture[void]() - proc cb(sock: TSocketHandle): bool = + proc cb(sock: TAsyncFD): bool = # We have connected. retFuture.complete() return true @@ -590,7 +616,7 @@ else: var lastError: TOSErrorCode var it = aiList while it != nil: - var ret = connect(socket, it.ai_addr, it.ai_addrlen.TSocklen) + var ret = connect(socket.TSocketHandle, it.ai_addr, it.ai_addrlen.TSocklen) if ret == 0: # Request to connect completed immediately. success = true @@ -600,7 +626,7 @@ else: lastError = osLastError() if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: success = true - addWrite(p, socket, cb) + addWrite(socket, cb) break else: success = false @@ -611,17 +637,18 @@ else: retFuture.fail(newException(EOS, osErrorMsg(lastError))) return retFuture - proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, + proc recv*(socket: TAsyncFD, size: int, flags: int = 0): PFuture[string] = var retFuture = newFuture[string]() var readBuffer = newString(size) var sizeRead = 0 - proc cb(sock: TSocketHandle): bool = + proc cb(sock: TAsyncFD): bool = result = true let netSize = size - sizeRead - let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) + let res = recv(sock.TSocketHandle, addr readBuffer[sizeRead], netSize, + flags.cint) #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() @@ -645,19 +672,19 @@ else: retFuture.complete(readBuffer) #echo("Recv cb result: ", result) - addRead(p, socket, cb) + addRead(socket, cb) return retFuture - proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] = + proc send*(socket: TAsyncFD, data: string): PFuture[void] = var retFuture = newFuture[void]() var written = 0 - proc cb(sock: TSocketHandle): bool = + proc cb(sock: TAsyncFD): bool = result = true let netSize = data.len-written var d = data.cstring - let res = send(sock, addr d[written], netSize, 0.cint) + let res = send(sock.TSocketHandle, addr d[written], netSize, 0.cint) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -670,18 +697,18 @@ else: result = false # We still have data to send. else: retFuture.complete() - addWrite(p, socket, cb) + addWrite(socket, cb) return retFuture - proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): - PFuture[tuple[address: string, client: TSocketHandle]] = - var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() - proc cb(sock: TSocketHandle): bool = + proc acceptAddr*(socket: TAsyncFD): + PFuture[tuple[address: string, client: TAsyncFD]] = + var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]() + proc cb(sock: TAsyncFD): bool = result = true var sockAddress: Tsockaddr_in var addrLen = sizeof(sockAddress).TSocklen - var client = accept(sock, cast[ptr TSockAddr](addr(sockAddress)), - addr(addrLen)) + var client = accept(sock.TSocketHandle, + cast[ptr TSockAddr](addr(sockAddress)), addr(addrLen)) if client == osInvalidSocket: let lastError = osLastError() assert lastError.int32 notin {EWOULDBLOCK, EAGAIN} @@ -690,19 +717,19 @@ else: else: retFuture.fail(newException(EOS, osErrorMsg(lastError))) else: - p.register(client) - retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client)) - addRead(p, socket, cb) + register(client.TAsyncFD) + retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client.TAsyncFD)) + addRead(socket, cb) return retFuture -proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] = +proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection. ## The future will complete when the connection is successfully accepted. - var retFut = newFuture[TSocketHandle]() - var fut = p.acceptAddr(socket) + var retFut = newFuture[TAsyncFD]() + var fut = acceptAddr(socket) fut.callback = - proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) = + proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) = assert future.finished if future.failed: retFut.fail(future.error) @@ -891,7 +918,7 @@ macro async*(prc: stmt): stmt {.immediate.} = echo(toStrLit(result)) -proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} = +proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} = ## Reads a line of data from ``socket``. Returned future will complete once ## a full line is read or an error occurs. ## @@ -912,28 +939,24 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} result = "" var c = "" while true: - c = await p.recv(socket, 1) + c = await recv(socket, 1) if c.len == 0: return "" if c == "\r": - c = await p.recv(socket, 1, MSG_PEEK) + c = await recv(socket, 1, MSG_PEEK) if c.len > 0 and c == "\L": - discard await p.recv(socket, 1) + discard await recv(socket, 1) addNLIfEmpty() return elif c == "\L": addNLIfEmpty() return - add(result.string, c) - -var gDisp*{.threadvar.}: PDispatcher ## Global dispatcher -gDisp = newDispatcher() + add(result, c) proc runForever*() = ## Begins a never ending global dispatcher poll loop. while true: - gDisp.poll() - + poll() when isMainModule: diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index ed8a2fb82..451ef25ac 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -1,3 +1,11 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2014 Dominik Picheta +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# import asyncdispatch import rawsockets import net @@ -7,7 +15,7 @@ when defined(ssl): type TAsyncSocket = object ## socket type - fd: TSocketHandle + fd: TAsyncFD case isBuffered: bool # determines whether this socket is buffered. of true: buffer: array[0..BufferSize, char] @@ -28,18 +36,18 @@ type # TODO: Save AF, domain etc info and reuse it in procs which need it like connect. -proc newSocket(fd: TSocketHandle, isBuff: bool): PAsyncSocket = - assert fd != osInvalidSocket +proc newSocket(fd: TAsyncFD, isBuff: bool): PAsyncSocket = + assert fd != osInvalidSocket.TAsyncFD new(result) result.fd = fd result.isBuffered = isBuff if isBuff: result.currPos = 0 -proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, +proc newAsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, protocol: TProtocol = IPPROTO_TCP, buffered = true): PAsyncSocket = ## Creates a new asynchronous socket. - result = newSocket(gDisp.socket(domain, typ, protocol), buffered) + result = newSocket(newAsyncRawSocket(domain, typ, protocol), buffered) proc connect*(socket: PAsyncSocket, address: string, port: TPort, af = AF_INET): PFuture[void] = @@ -47,7 +55,7 @@ proc connect*(socket: PAsyncSocket, address: string, port: TPort, ## ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. - result = gDisp.connect(socket.fd, address, port, af) + result = connect(socket.fd, address, port, af) proc recv*(socket: PAsyncSocket, size: int, flags: int = 0): PFuture[string] = @@ -56,12 +64,12 @@ proc recv*(socket: PAsyncSocket, size: int, ## recv operation then the future may complete with only a part of the ## requested data read. If socket is disconnected and no data is available ## to be read then the future will complete with a value of ``""``. - result = gDisp.recv(socket.fd, size, flags) + result = recv(socket.fd, size, flags) proc send*(socket: PAsyncSocket, data: string): PFuture[void] = ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. - result = gDisp.send(socket.fd, data) + result = send(socket.fd, data) proc acceptAddr*(socket: PAsyncSocket): PFuture[tuple[address: string, client: PAsyncSocket]] = @@ -69,9 +77,9 @@ proc acceptAddr*(socket: PAsyncSocket): ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]() - var fut = gDisp.acceptAddr(socket.fd) + var fut = acceptAddr(socket.fd) fut.callback = - proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) = + proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) = assert future.finished if future.failed: retFuture.fail(future.readError) @@ -133,7 +141,7 @@ proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} = when isMainModule: proc main() {.async.} = - var sock = AsyncSocket() + var sock = newAsyncSocket() await sock.connect("irc.freenode.net", TPort(6667)) while true: let line = await sock.recvLine() diff --git a/lib/pure/net.nim b/lib/pure/net.nim index d40f0949b..2461ece1b 100644 --- a/lib/pure/net.nim +++ b/lib/pure/net.nim @@ -347,7 +347,7 @@ type ETimeout* = object of ESynch -proc newSocket(fd: TSocketHandle, isBuff: bool): PSocket = +proc createSocket(fd: TSocketHandle, isBuff: bool): PSocket = assert fd != osInvalidSocket new(result) result.fd = fd @@ -355,15 +355,15 @@ proc newSocket(fd: TSocketHandle, isBuff: bool): PSocket = if isBuff: result.currPos = 0 -proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, +proc newSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, protocol: TProtocol = IPPROTO_TCP, buffered = true): PSocket = ## Creates a new socket. ## ## If an error occurs EOS will be raised. - let fd = rawsockets.socket(domain, typ, protocol) + let fd = newRawSocket(domain, typ, protocol) if fd == osInvalidSocket: osError(osLastError()) - result = newSocket(fd, buffered) + result = createSocket(fd, buffered) when defined(ssl): CRYPTO_malloc_init() diff --git a/lib/pure/rawsockets.nim b/lib/pure/rawsockets.nim index de7437420..aeaa7f3b5 100644 --- a/lib/pure/rawsockets.nim +++ b/lib/pure/rawsockets.nim @@ -143,7 +143,7 @@ else: result = cint(ord(p)) -proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, +proc newRawSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, protocol: TProtocol = IPPROTO_TCP): TSocketHandle = ## Creates a new socket; returns `InvalidSocket` if an error occurs. socket(toInt(domain), toInt(typ), toInt(protocol)) diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim index 7f2c1e6cc..7e6270247 100644 --- a/tests/async/tasyncawait.nim +++ b/tests/async/tasyncawait.nim @@ -5,7 +5,6 @@ discard """ """ import asyncdispatch, rawsockets, net, strutils, os -var disp = newDispatcher() var msgCount = 0 const @@ -14,31 +13,31 @@ const var clientCount = 0 -proc sendMessages(disp: PDispatcher, client: TSocketHandle) {.async.} = +proc sendMessages(client: TAsyncFD) {.async.} = for i in 0 .. <messagesToSend: - await disp.send(client, "Message " & $i & "\c\L") + await send(client, "Message " & $i & "\c\L") -proc launchSwarm(disp: PDispatcher, port: TPort) {.async.} = +proc launchSwarm(port: TPort) {.async.} = for i in 0 .. <swarmSize: - var sock = disp.socket() + var sock = newAsyncRawSocket() #disp.register(sock) - await disp.connect(sock, "localhost", port) + await connect(sock, "localhost", port) when true: - await sendMessages(disp, sock) - disp.close(sock) + await sendMessages(sock) + close(sock) else: # Issue #932: https://github.com/Araq/Nimrod/issues/932 - var msgFut = sendMessages(disp, sock) + var msgFut = sendMessages(sock) msgFut.callback = proc () = - disp.close(sock) + close(sock) -proc readMessages(disp: PDispatcher, client: TSocketHandle) {.async.} = +proc readMessages(client: TAsyncFD) {.async.} = while true: - var line = await disp.recvLine(client) + var line = await recvLine(client) if line == "": - disp.close(client) + close(client) clientCount.inc break else: @@ -47,8 +46,8 @@ proc readMessages(disp: PDispatcher, client: TSocketHandle) {.async.} = else: doAssert false -proc createServer(disp: PDispatcher, port: TPort) {.async.} = - var server = disp.socket() +proc createServer(port: TPort) {.async.} = + var server = newAsyncRawSocket() #disp.register(server) block: var name: TSockaddr_in @@ -58,20 +57,20 @@ proc createServer(disp: PDispatcher, port: TPort) {.async.} = name.sin_family = toInt(AF_INET) name.sin_port = htons(int16(port)) name.sin_addr.s_addr = htonl(INADDR_ANY) - if bindAddr(server, cast[ptr TSockAddr](addr(name)), - sizeof(name).TSocklen) < 0'i32: + if bindAddr(server.TSocketHandle, cast[ptr TSockAddr](addr(name)), + sizeof(name).TSocklen) < 0'i32: osError(osLastError()) - discard server.listen() + discard server.TSocketHandle.listen() while true: - var client = await disp.accept(server) - readMessages(disp, client) + var client = await accept(server) + readMessages(client) # TODO: Test: readMessages(disp, await disp.accept(server)) -disp.createServer(TPort(10335)) -disp.launchSwarm(TPort(10335)) +createServer(TPort(10335)) +launchSwarm(TPort(10335)) while true: - disp.poll() + poll() if clientCount == swarmSize: break assert msgCount == swarmSize * messagesToSend |