1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
|
import std/nativesockets
import std/net
import std/os
import std/posix
import io/serversocket
type
DynStream* = ref object of RootObj
isend*: bool
blocking*: bool #TODO move to posixstream
# Semantics of this function are those of POSIX read(2): that is, it may return
# a result that is lower than `len`, and that does not mean the stream is
# finished.
# isend must be set by implementations when the end of the stream is reached.
# An exception should be raised if recvData is called with the 'isend' flag set
# to true.
method recvData*(s: DynStream; buffer: pointer; len: int): int {.base.} =
assert false
# See above, but with write(2)
method sendData*(s: DynStream; buffer: pointer; len: int): int {.base.} =
assert false
method seek*(s: DynStream; off: int) {.base.} =
assert false
method sclose*(s: DynStream) {.base.} =
assert false
method sflush*(s: DynStream) {.base.} =
discard
proc recvData*(s: DynStream; buffer: var openArray[uint8]): int {.inline.} =
return s.recvData(addr buffer[0], buffer.len)
proc recvData*(s: DynStream; buffer: var openArray[char]): int {.inline.} =
return s.recvData(addr buffer[0], buffer.len)
proc sendData*(s: DynStream; buffer: openArray[char]): int {.inline.} =
return s.sendData(unsafeAddr buffer[0], buffer.len)
proc sendData*(s: DynStream; buffer: openArray[uint8]): int {.inline.} =
return s.sendData(unsafeAddr buffer[0], buffer.len)
proc sendDataLoop*(s: DynStream; buffer: pointer; len: int) =
var n = 0
while true:
n += s.sendData(addr cast[ptr UncheckedArray[uint8]](buffer)[n], len - n)
if n == len:
break
proc sendDataLoop*(s: DynStream; buffer: openArray[uint8]) {.inline.} =
if buffer.len > 0:
s.sendDataLoop(unsafeAddr buffer[0], buffer.len)
proc sendDataLoop*(s: DynStream; buffer: openArray[char]) {.inline.} =
if buffer.len > 0:
s.sendDataLoop(unsafeAddr buffer[0], buffer.len)
proc write*(s: DynStream; buffer: openArray[char]) {.inline.} =
s.sendDataLoop(buffer)
proc write*(s: DynStream; c: char) {.inline.} =
s.sendDataLoop(unsafeAddr c, 1)
proc sreadChar*(s: DynStream): char =
let n = s.recvData(addr result, 1)
assert n == 1
proc recvDataLoop*(s: DynStream; buffer: pointer; len: int) =
var n = 0
while true:
n += s.recvData(addr cast[ptr UncheckedArray[uint8]](buffer)[n], len - n)
if n == len:
break
proc recvDataLoop*(s: DynStream; buffer: var openArray[uint8]) {.inline.} =
s.recvDataLoop(addr buffer[0], buffer.len)
proc recvAll*(s: DynStream): string =
var buffer = newString(4096)
var idx = 0
while true:
let n = s.recvData(addr buffer[idx], buffer.len - idx)
if n == 0:
break
idx += n
if idx == buffer.len:
buffer.setLen(buffer.len + 4096)
buffer.setLen(idx)
return buffer
type
PosixStream* = ref object of DynStream
fd*: cint
ErrorAgain* = object of IOError
ErrorBadFD* = object of IOError
ErrorFault* = object of IOError
ErrorInterrupted* = object of IOError
ErrorInvalid* = object of IOError
ErrorConnectionReset* = object of IOError
ErrorBrokenPipe* = object of IOError
proc raisePosixIOError() =
# In the nim stdlib, these are only constants on linux amd64, so we
# can't use a switch.
if errno == EAGAIN or errno == EWOULDBLOCK:
raise newException(ErrorAgain, "eagain")
elif errno == EBADF:
raise newException(ErrorBadFD, "bad fd")
elif errno == EFAULT:
raise newException(ErrorFault, "fault")
elif errno == EINVAL:
raise newException(ErrorInvalid, "invalid")
elif errno == ECONNRESET:
raise newException(ErrorConnectionReset, "connection reset by peer")
elif errno == EPIPE:
raise newException(ErrorBrokenPipe, "broken pipe")
else:
raise newException(IOError, $strerror(errno))
method recvData*(s: PosixStream; buffer: pointer; len: int): int =
let n = read(s.fd, buffer, len)
if n < 0:
raisePosixIOError()
if n == 0:
if unlikely(s.isend):
raise newException(EOFError, "eof")
s.isend = true
return n
proc sreadChar*(s: PosixStream): char =
let n = read(s.fd, addr result, 1)
assert n == 1
method sendData*(s: PosixStream; buffer: pointer; len: int): int =
let n = write(s.fd, buffer, len)
if n < 0:
raisePosixIOError()
return n
method setBlocking*(s: PosixStream; blocking: bool) {.base.} =
s.blocking = blocking
let ofl = fcntl(s.fd, F_GETFL, 0)
if blocking:
discard fcntl(s.fd, F_SETFL, ofl and not O_NONBLOCK)
else:
discard fcntl(s.fd, F_SETFL, ofl or O_NONBLOCK)
method seek*(s: PosixStream; off: int) =
if lseek(s.fd, Off(off), SEEK_SET) == -1:
raisePosixIOError()
method sclose*(s: PosixStream) =
discard close(s.fd)
proc newPosixStream*(fd: FileHandle): PosixStream =
return PosixStream(fd: fd, blocking: true)
proc newPosixStream*(path: string; flags, mode: cint): PosixStream =
let fd = open(cstring(path), flags, mode)
if fd == -1:
return nil
return newPosixStream(fd)
type SocketStream* = ref object of PosixStream
source*: Socket
method recvData*(s: SocketStream; buffer: pointer; len: int): int =
let n = s.source.recv(buffer, len)
if n < 0:
raisePosixIOError()
if n == 0:
if unlikely(s.isend):
raise newException(EOFError, "eof")
s.isend = true
return n
method sendData*(s: SocketStream; buffer: pointer; len: int): int =
let n = s.source.send(buffer, len)
if n < 0:
raisePosixIOError()
return n
{.compile: "sendfd.c".}
proc sendfd(sock, fd: cint): int {.importc.}
proc sendFileHandle*(s: SocketStream; fd: FileHandle) =
assert not s.source.hasDataBuffered
let n = sendfd(s.fd, cint(fd))
if n < 0:
raisePosixIOError()
assert n == 1 # we send a single nul byte as buf
{.compile: "recvfd.c".}
proc recvfd(sock: cint; fdout: ptr cint): int {.importc.}
proc recvFileHandle*(s: SocketStream): FileHandle =
assert not s.source.hasDataBuffered
var fd: cint
let n = recvfd(s.fd, addr fd)
if n < 0:
raisePosixIOError()
return FileHandle(fd)
method setBlocking*(s: SocketStream; blocking: bool) =
s.blocking = blocking
s.source.getFd().setBlocking(blocking)
method seek*(s: SocketStream; off: int) =
doAssert false
method sclose*(s: SocketStream) =
s.source.close()
# see serversocket.nim for an explanation
{.compile: "connect_unix.c".}
proc connect_unix_from_c(fd: cint; path: cstring; pathlen: cint): cint
{.importc.}
when defined(freebsd):
# for FreeBSD/capsicum
proc connectat_unix_from_c(baseFd, sockFd: cint; rel_path: cstring;
rel_pathlen: cint): cint {.importc.}
proc connectAtSocketStream0(socketDir: string; baseFd, pid: int;
blocking = true): SocketStream =
let sock = newSocket(Domain.AF_UNIX, SockType.SOCK_STREAM,
Protocol.IPPROTO_IP, buffered = false)
if not blocking:
sock.getFd().setBlocking(false)
let path = getSocketPath(socketDir, pid)
if baseFd == -1:
if connect_unix_from_c(cint(sock.getFd()), cstring(path),
cint(path.len)) != 0:
raiseOSError(osLastError())
else:
when defined(freebsd):
doAssert baseFd != -1
let name = getSocketName(pid)
if connectat_unix_from_c(cint(baseFd), cint(sock.getFd()), cstring(name),
cint(name.len)) != 0:
raiseOSError(osLastError())
else:
# shouldn't have sockDirFd on other architectures
doAssert false
return SocketStream(
source: sock,
fd: cint(sock.getFd()),
blocking: blocking
)
proc connectSocketStream*(socketDir: string; baseFd, pid: int;
blocking = true): SocketStream =
try:
return connectAtSocketStream0(socketDir, baseFd, pid, blocking)
except OSError:
return nil
proc acceptSocketStream*(ssock: ServerSocket; blocking = true): SocketStream =
var sock: Socket
ssock.sock.accept(sock, inheritable = true)
if not blocking:
sock.getFd().setBlocking(false)
return SocketStream(
blocking: blocking,
source: sock,
fd: cint(sock.getFd())
)
type
BufStream* = ref object of DynStream
source*: PosixStream
registerFun: proc(fd: int)
registered: bool
writeBuffer: string
method recvData*(s: BufStream; buffer: pointer; len: int): int =
s.source.recvData(buffer, len)
method sendData*(s: BufStream; buffer: pointer; len: int): int =
s.source.setBlocking(false)
block nobuf:
var n: int
if not s.registered:
try:
n = s.source.sendData(buffer, len)
if n == len:
break nobuf
except ErrorAgain:
discard
s.registerFun(s.source.fd)
s.registered = true
let olen = s.writeBuffer.len
s.writeBuffer.setLen(s.writeBuffer.len + len - n)
let buffer = cast[ptr UncheckedArray[uint8]](buffer)
copyMem(addr s.writeBuffer[olen], addr buffer[n], len - n)
s.source.setBlocking(true)
return len
method sclose*(s: BufStream) =
s.source.sclose()
proc flushWrite*(s: BufStream): bool =
s.source.setBlocking(false)
let n = s.source.sendData(s.writeBuffer)
s.source.setBlocking(true)
if n == s.writeBuffer.len:
s.writeBuffer = ""
s.registered = false
return true
s.writeBuffer = s.writeBuffer.substr(n)
return false
proc reallyFlush*(s: BufStream) =
if s.writeBuffer.len > 0:
s.source.sendDataLoop(s.writeBuffer)
proc newBufStream*(ps: PosixStream; registerFun: proc(fd: int)): BufStream =
return BufStream(source: ps, blocking: ps.blocking, registerFun: registerFun)
type
DynFileStream* = ref object of DynStream
file*: File
method recvData*(s: DynFileStream; buffer: pointer; len: int): int =
let n = s.file.readBuffer(buffer, len)
if n == 0:
if unlikely(s.isend):
raise newException(EOFError, "eof")
s.isend = true
return n
method sendData*(s: DynFileStream; buffer: pointer; len: int): int =
return s.file.writeBuffer(buffer, len)
method seek*(s: DynFileStream; off: int) =
s.file.setFilePos(int64(off))
method sclose*(s: DynFileStream) =
s.file.close()
method sflush*(s: DynFileStream) =
s.file.flushFile()
proc newDynFileStream*(file: File): DynFileStream =
return DynFileStream(file: file, blocking: true)
proc newDynFileStream*(path: string): DynFileStream =
var file: File
if file.open(path):
return newDynFileStream(path)
return nil
|