import std/nativesockets
import std/net
import std/os
import std/posix
import io/serversocket
type
DynStream* = ref object of RootObj
isend*: bool
blocking*: bool #TODO move to posixstream
closed: bool
# Semantics of this function are those of POSIX read(2): that is, it may return
# a result that is lower than `len`, and that does not mean the stream is
# finished.
# isend must be set by implementations when the end of the stream is reached.
# An exception should be raised if recvData is called with the 'isend' flag set
# to true.
method recvData*(s: DynStream; buffer: pointer; len: int): int {.base.} =
assert false
# See above, but with write(2)
method sendData*(s: DynStream; buffer: pointer; len: int): int {.base.} =
assert false
method seek*(s: DynStream; off: int) {.base.} =
assert false
method sclose*(s: DynStream) {.base.} =
assert false
method sflush*(s: DynStream) {.base.} =
discard
proc recvData*(s: DynStream; buffer: var openArray[uint8]): int {.inline.} =
return s.recvData(addr buffer[0], buffer.len)
proc recvData*(s: DynStream; buffer: var openArray[char]): int {.inline.} =
return s.recvData(addr buffer[0], buffer.len)
proc sendData*(s: DynStream; buffer: openArray[char]): int {.inline.} =
return s.sendData(unsafeAddr buffer[0], buffer.len)
proc sendData*(s: DynStream; buffer: openArray[uint8]): int {.inline.} =
return s.sendData(unsafeAddr buffer[0], buffer.len)
proc sendDataLoop*(s: DynStream; buffer: pointer; len: int) =
var n = 0
while true:
n += s.sendData(addr cast[ptr UncheckedArray[uint8]](buffer)[n], len - n)
if n == len:
break
proc sendDataLoop*(s: DynStream; buffer: openArray[uint8]) {.inline.} =
if buffer.len > 0:
s.sendDataLoop(unsafeAddr buffer[0], buffer.len)
proc sendDataLoop*(s: DynStream; buffer: openArray[char]) {.inline.} =
if buffer.len > 0:
s.sendDataLoop(unsafeAddr buffer[0], buffer.len)
proc write*(s: DynStream; buffer: openArray[char]) {.inline.} =
s.sendDataLoop(buffer)
proc write*(s: DynStream; c: char) {.inline.} =
s.sendDataLoop(unsafeAddr c, 1)
proc sreadChar*(s: DynStream): char =
let n = s.recvData(addr result, 1)
assert n == 1
proc recvDataLoop*(s: DynStream; buffer: pointer; len: int) =
var n = 0
while true:
n += s.recvData(addr cast[ptr UncheckedArray[uint8]](buffer)[n], len - n)
if n == len:
break
proc recvDataLoop*(s: DynStream; buffer: var openArray[uint8]) {.inline.} =
s.recvDataLoop(addr buffer[0], buffer.len)
proc recvAll*(s: DynStream): string =
var buffer = newString(4096)
var idx = 0
while true:
let n = s.recvData(addr buffer[idx], buffer.len - idx)
if n == 0:
break
idx += n
if idx == buffer.len:
buffer.setLen(buffer.len + 4096)
buffer.setLen(idx)
return buffer
type
PosixStream* = ref object of DynStream
fd*: cint
ErrorAgain* = object of IOError
ErrorBadFD* = object of IOError
ErrorFault* = object of IOError
ErrorInterrupted* = object of IOError
ErrorInvalid* = object of IOError
ErrorConnectionReset* = object of IOError
ErrorBrokenPipe* = object of IOError
proc raisePosixIOError() =
# In the nim stdlib, these are only constants on linux amd64, so we
# can't use a switch.
if errno == EAGAIN or errno == EWOULDBLOCK:
raise newException(ErrorAgain, "eagain")
elif errno == EBADF:
raise newException(ErrorBadFD, "bad fd")
elif errno == EFAULT:
raise newException(ErrorFault, "fault")
elif errno == EINVAL:
raise newException(ErrorInvalid, "invalid")
elif errno == ECONNRESET:
raise newException(ErrorConnectionReset, "connection reset by peer")
elif errno == EPIPE:
raise newException(ErrorBrokenPipe, "broken pipe")
else:
raise newException(IOError, $strerror(errno))
method recvData*(s: PosixStream; buffer: pointer; len: int): int =
let n = read(s.fd, buffer, len)
if n < 0:
raisePosixIOError()
if n == 0:
if unlikely(s.isend):
raise newException(EOFError, "eof")
s.isend = true
return n
proc sreadChar*(s: PosixStream): char =
let n = read(s.fd, addr result, 1)
assert n == 1
method sendData*(s: PosixStream; buffer: pointer; len: int): int =
let n = write(s.fd, buffer, len)
if n < 0:
raisePosixIOError()
return n
method setBlocking*(s: PosixStream; blocking: bool) {.base.} =
s.blocking = blocking
let ofl = fcntl(s.fd, F_GETFL, 0)
if blocking:
discard fcntl(s.fd, F_SETFL, ofl and not O_NONBLOCK)
else:
discard fcntl(s.fd, F_SETFL, ofl or O_NONBLOCK)
method seek*(s: PosixStream; off: int) =
if lseek(s.fd, Off(off), SEEK_SET) == -1:
raisePosixIOError()
method sclose*(s: PosixStream) =
assert not s.closed
discard close(s.fd)
s.closed = true
proc newPosixStream*(fd: FileHandle): PosixStream =
return PosixStream(fd: fd, blocking: true)
proc newPosixStream*(path: string; flags, mode: cint): PosixStream =
let fd = open(cstring(path), flags, mode)
if fd == -1:
return nil
return newPosixStream(fd)
type SocketStream* = ref object of PosixStream
source*: Socket
method recvData*(s: SocketStream; buffer: pointer; len: int): int =
let n = s.source.recv(buffer, len)
if n < 0:
raisePosixIOError()
if n == 0:
if unlikely(s.isend):
raise newException(EOFError, "eof")
s.isend = true
return n
method sendData*(s: SocketStream; buffer: pointer; len: int): int =
let n = s.source.send(buffer, len)
if n < 0:
raisePosixIOError()
return n
{.compile: "sendfd.c".}
proc sendfd(sock, fd: cint): int {.importc.}
proc sendFileHandle*(s: SocketStream; fd: FileHandle) =
assert not s.source.hasDataBuffered
let n = sendfd(s.fd, cint(fd))
if n < 0:
raisePosixIOError()
assert n == 1 # we send a single nul byte as buf
{.compile: "recvfd.c".}
proc recvfd(sock: cint; fdout: ptr cint): int {.importc.}
proc recvFileHandle*(s: SocketStream): FileHandle =
assert not s.source.hasDataBuffered
var fd: cint
let n = recvfd(s.fd, addr fd)
if n < 0:
raisePosixIOError()
return FileHandle(fd)
method setBlocking*(s: SocketStream; blocking: bool) =
s.blocking = blocking
s.source.getFd().setBlocking(blocking)
method seek*(s: SocketStream; off: int) =
doAssert false
method sclose*(s: SocketStream) =
assert not s.closed
s.source.close()
s.closed = true
# see serversocket.nim for an explanation
{.compile: "connect_unix.c".}
proc connect_unix_from_c(fd: cint; path: cstring; pathlen: cint): cint
{.importc.}
when defined(freebsd):
# for FreeBSD/capsicum
proc connectat_unix_from_c(baseFd, sockFd: cint; rel_path: cstring;
rel_pathlen: cint): cint {.importc.}
proc connectAtSocketStream0(socketDir: string; baseFd, pid: int;
blocking = true): SocketStream =
let sock = newSocket(Domain.AF_UNIX, SockType.SOCK_STREAM,
Protocol.IPPROTO_IP, buffered = false)
if not blocking:
sock.getFd().setBlocking(false)
let path = getSocketPath(socketDir, pid)
if baseFd == -1:
if connect_unix_from_c(cint(sock.getFd()), cstring(path),
cint(path.len)) != 0:
raiseOSError(osLastError())
else:
when defined(freebsd):
doAssert baseFd != -1
let name = getSocketName(pid)
if connectat_unix_from_c(cint(baseFd), cint(sock.getFd()), cstring(name),
cint(name.len)) != 0:
raiseOSError(osLastError())
else:
# shouldn't have sockDirFd on other architectures
doAssert false
return SocketStream(
source: sock,
fd: cint(sock.getFd()),
blocking: blocking
)
proc connectSocketStream*(socketDir: string; baseFd, pid: int;
blocking = true): SocketStream =
try:
return connectAtSocketStream0(socketDir, baseFd, pid, blocking)
except OSError:
return nil
proc acceptSocketStream*(ssock: ServerSocket; blocking = true): SocketStream =
var sock: Socket
ssock.sock.accept(sock, inheritable = true)
if not blocking:
sock.getFd().setBlocking(false)
return SocketStream(
blocking: blocking,
source: sock,
fd: cint(sock.getFd())
)
type
BufStream* = ref object of DynStream
source*: PosixStream
registerFun: proc(fd: int)
registered: bool
writeBuffer: string
method recvData*(s: BufStream; buffer: pointer; len: int): int =
s.source.recvData(buffer, len)
method sendData*(s: BufStream; buffer: pointer; len: int): int =
s.source.setBlocking(false)
block nobuf:
var n: int
if not s.registered:
try:
n = s.source.sendData(buffer, len)
if n == len:
break nobuf
except ErrorAgain:
discard
s.registerFun(s.source.fd)
s.registered = true
let olen = s.writeBuffer.len
s.writeBuffer.setLen(s.writeBuffer.len + len - n)
let buffer = cast[ptr UncheckedArray[uint8]](buffer)
copyMem(addr s.writeBuffer[olen], addr buffer[n], len - n)
s.source.setBlocking(true)
return len
method sclose*(s: BufStream) =
assert not s.closed
s.source.sclose()
s.closed = true
proc flushWrite*(s: BufStream): bool =
s.source.setBlocking(false)
let n = s.source.sendData(s.writeBuffer)
s.source.setBlocking(true)
if n == s.writeBuffer.len:
s.writeBuffer = ""
s.registered = false
return true
s.writeBuffer = s.writeBuffer.substr(n)
return false
proc reallyFlush*(s: BufStream) =
if s.writeBuffer.len > 0:
s.source.sendDataLoop(s.writeBuffer)
proc newBufStream*(ps: PosixStream; registerFun: proc(fd: int)): BufStream =
return BufStream(source: ps, blocking: ps.blocking, registerFun: registerFun)
type
DynFileStream* = ref object of DynStream
file*: File
method recvData*(s: DynFileStream; buffer: pointer; len: int): int =
let n = s.file.readBuffer(buffer, len)
if n == 0:
if unlikely(s.isend):
raise newException(EOFError, "eof")
s.isend = true
return n
method sendData*(s: DynFileStream; buffer: pointer; len: int): int =
return s.file.writeBuffer(buffer, len)
method seek*(s: DynFileStream; off: int) =
s.file.setFilePos(int64(off))
method sclose*(s: DynFileStream) =
assert not s.closed
s.file.close()
s.closed = true
method sflush*(s: DynFileStream) =
s.file.flushFile()
proc newDynFileStream*(file: File): DynFileStream =
return DynFileStream(file: file, blocking: true)
proc newDynFileStream*(path: string): DynFileStream =
var file: File
if file.open(path):
return newDynFileStream(path)
return nil