about summary refs log tree commit diff stats
path: root/src/server/loaderiface.nim
blob: faa9b021bd3d556651dce9f41181df521e4d56c7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# Interface to server/loader. The idea is that modules don't have to
# depend on the entire loader implementation to interact with it.
#
# See server/loader for a more detailed description of the protocol.

import std/tables

import io/bufreader
import io/bufwriter
import io/dynstream
import io/promise
import monoucha/javascript
import monoucha/jserror
import server/headers
import server/request
import server/response
import server/urlfilter
import types/cookie
import types/opt
import types/referrer
import types/url

type
  FileLoader* = ref object
    key*: ClientKey
    process*: int
    clientPid*: int
    map: seq[MapData]
    mapFds*: int # number of fds in map
    unregistered*: seq[int]
    registerFun*: proc(fd: int)
    unregisterFun*: proc(fd: int)
    # directory where we store UNIX domain sockets
    sockDir*: string
    # (FreeBSD only) fd for the socket directory so we can connectat() on it
    sockDirFd*: cint

  ConnectDataState = enum
    cdsBeforeResult, cdsBeforeStatus, cdsBeforeHeaders

  MapData* = ref object of RootObj
    stream*: SocketStream

  LoaderData = ref object of MapData

  ConnectData* = ref object of LoaderData
    state: ConnectDataState
    status: uint16
    res: int
    outputId: int
    redirectNum: int
    promise: FetchPromise
    request: Request

  OngoingData* = ref object of LoaderData
    response*: Response

  LoaderCommand* = enum
    lcAddCacheFile
    lcAddClient
    lcGetCacheFile
    lcLoad
    lcLoadConfig
    lcPassFd
    lcRedirectToFile
    lcRemoveCachedItem
    lcRemoveClient
    lcResume
    lcShareCachedItem
    lcSuspend
    lcTee
    lcOpenCachedItem

  ClientKey* = array[32, uint8]

  LoaderClientConfig* = object
    cookieJar*: CookieJar
    defaultHeaders*: Headers
    filter*: URLFilter
    proxy*: URL
    referrerPolicy*: ReferrerPolicy
    insecureSSLNoVerify*: bool

proc getRedirect*(response: Response; request: Request): Request =
  if "Location" in response.headers.table:
    if response.status in 301u16..303u16 or response.status in 307u16..308u16:
      let location = response.headers.table["Location"][0]
      let url = parseURL(location, option(request.url))
      if url.isSome:
        let status = response.status
        if status == 303 and request.httpMethod notin {hmGet, hmHead} or
            status == 301 or
            status == 302 and request.httpMethod == hmPost:
          return newRequest(url.get, hmGet)
        else:
          return newRequest(url.get, request.httpMethod, body = request.body)
  return nil

template withLoaderPacketWriter(stream: SocketStream; loader: FileLoader;
    w, body: untyped) =
  stream.withPacketWriter w:
    w.swrite(loader.clientPid)
    w.swrite(loader.key)
    body

proc connect(loader: FileLoader): SocketStream =
  return connectSocketStream(loader.sockDir, loader.sockDirFd, loader.process,
    blocking = true)

# Start a request. This should not block (not for a significant amount of time
# anyway).
proc startRequest(loader: FileLoader; request: Request): SocketStream =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcLoad)
    w.swrite(request)
  return stream

proc startRequest*(loader: FileLoader; request: Request;
    config: LoaderClientConfig): SocketStream =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcLoadConfig)
    w.swrite(request)
    w.swrite(config)
  return stream

iterator data*(loader: FileLoader): MapData {.inline.} =
  for it in loader.map:
    if it != nil:
      yield it

iterator ongoing*(loader: FileLoader): OngoingData {.inline.} =
  for it in loader.data:
    if it of OngoingData:
      yield OngoingData(it)

func fd*(data: MapData): int =
  return int(data.stream.fd)

proc put*(loader: FileLoader; data: MapData) =
  let fd = int(data.stream.fd)
  if loader.map.len <= fd:
    loader.map.setLen(fd + 1)
  assert loader.map[fd] == nil
  loader.map[fd] = data
  if data of LoaderData:
    inc loader.mapFds

proc get*(loader: FileLoader; fd: int): MapData =
  if fd < loader.map.len:
    return loader.map[fd]
  return nil

proc unset*(loader: FileLoader; fd: int) =
  if loader.map[fd] != nil and loader.map[fd] of LoaderData:
    dec loader.mapFds
  loader.map[fd] = nil

proc unset*(loader: FileLoader; data: MapData) =
  let fd = int(data.stream.fd)
  if loader.get(fd) != nil:
    loader.unset(fd)

proc fetch0(loader: FileLoader; input: Request; promise: FetchPromise;
    redirectNum: int) =
  let stream = loader.startRequest(input)
  loader.registerFun(int(stream.fd))
  loader.put(ConnectData(
    promise: promise,
    request: input,
    stream: stream,
    redirectNum: redirectNum
  ))

proc fetch*(loader: FileLoader; input: Request): FetchPromise =
  let promise = FetchPromise()
  loader.fetch0(input, promise, 0)
  return promise

proc reconnect*(loader: FileLoader; data: ConnectData) =
  data.stream.sclose()
  let stream = loader.startRequest(data.request)
  let data = ConnectData(
    promise: data.promise,
    request: data.request,
    stream: stream
  )
  loader.put(data)
  loader.registerFun(data.fd)

proc suspend*(loader: FileLoader; fds: seq[int]) =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcSuspend)
    w.swrite(fds)
  stream.sclose()

proc resume*(loader: FileLoader; fds: openArray[int]) =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcResume)
    w.swrite(fds)
  stream.sclose()

proc resume*(loader: FileLoader; fds: int) =
  loader.resume([fds])

proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcTee)
    w.swrite(sourceId)
    w.swrite(targetPid)
  var outputId: int
  var r = stream.initPacketReader()
  r.sread(outputId)
  return (stream, outputId)

proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): int =
  let stream = loader.connect()
  if stream == nil:
    return -1
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcAddCacheFile)
    w.swrite(outputId)
    w.swrite(targetPid)
  var r = stream.initPacketReader()
  var outputId: int
  r.sread(outputId)
  stream.sclose()
  return outputId

proc getCacheFile*(loader: FileLoader; cacheId, sourcePid: int): string =
  let stream = loader.connect()
  if stream == nil:
    return ""
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcGetCacheFile)
    w.swrite(cacheId)
    w.swrite(sourcePid)
  var r = stream.initPacketReader()
  var s: string
  r.sread(s)
  stream.sclose()
  return s

proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string):
    bool =
  let stream = loader.connect()
  if stream == nil:
    return false
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcRedirectToFile)
    w.swrite(outputId)
    w.swrite(targetPath)
  var r = stream.initPacketReader()
  var res: bool
  r.sread(res)
  stream.sclose()
  return res

proc onConnected(loader: FileLoader; connectData: ConnectData) =
  let stream = connectData.stream
  let promise = connectData.promise
  let request = connectData.request
  var r = stream.initPacketReader()
  case connectData.state
  of cdsBeforeResult:
    var res: int
    r.sread(res) # packet 1
    if res == 0:
      r.sread(connectData.outputId) # packet 1
      inc connectData.state
    else:
      var msg: string
      # msg is discarded.
      #TODO maybe print if called from trusted code (i.e. global == client)?
      r.sread(msg) # packet 1
      let fd = connectData.fd
      loader.unregisterFun(fd)
      loader.unregistered.add(fd)
      stream.sclose()
      # delete before resolving the promise
      loader.unset(connectData)
      promise.resolve(JSResult[Response].err(newFetchTypeError()))
  of cdsBeforeStatus:
    r.sread(connectData.status) # packet 2
    inc connectData.state
  of cdsBeforeHeaders:
    let response = newResponse(connectData.res, request, stream,
      connectData.outputId, connectData.status)
    r.sread(response.headers) # packet 3
    # Only a stream of the response body may arrive after this point.
    response.body = stream
    # delete before resolving the promise
    loader.unset(connectData)
    let data = OngoingData(response: response, stream: stream)
    loader.put(data)
    assert loader.unregisterFun != nil
    response.unregisterFun = proc() =
      loader.unset(data)
      let fd = data.fd
      loader.unregistered.add(fd)
      loader.unregisterFun(fd)
    response.resumeFun = proc(outputId: int) =
      loader.resume(outputId)
    stream.setBlocking(false)
    let redirect = response.getRedirect(request)
    if redirect != nil:
      response.unregisterFun()
      stream.sclose()
      let redirectNum = connectData.redirectNum + 1
      if redirectNum < 5: #TODO use config.network.max_redirect?
        loader.fetch0(redirect, promise, redirectNum)
      else:
        promise.resolve(JSResult[Response].err(newFetchTypeError()))
    else:
      promise.resolve(JSResult[Response].ok(response))

proc onRead*(loader: FileLoader; data: OngoingData) =
  let response = data.response
  response.onRead(response)
  if response.body.isend:
    if response.onFinish != nil:
      response.onFinish(response, true)
    response.onFinish = nil
    response.close()

proc onRead*(loader: FileLoader; fd: int) =
  let data = loader.map[fd]
  if data of ConnectData:
    loader.onConnected(ConnectData(data))
  else:
    loader.onRead(OngoingData(data))

proc onError*(loader: FileLoader; data: OngoingData) =
  let response = data.response
  if response.onFinish != nil:
    response.onFinish(response, false)
  response.onFinish = nil
  response.close()

proc onError*(loader: FileLoader; fd: int): bool =
  let data = loader.map[fd]
  if data of ConnectData:
    # probably shouldn't happen. TODO
    return false
  else:
    loader.onError(OngoingData(data))
    return true

# Note: this blocks until headers are received.
proc doRequest*(loader: FileLoader; request: Request): Response =
  let stream = loader.startRequest(request)
  let response = Response(url: request.url)
  var r = stream.initPacketReader()
  r.sread(response.res) # packet 1
  if response.res == 0:
    r.sread(response.outputId) # packet 1
    r = stream.initPacketReader()
    r.sread(response.status) # packet 2
    r = stream.initPacketReader()
    r.sread(response.headers) # packet 3
    # Only a stream of the response body may arrive after this point.
    response.body = stream
    response.resumeFun = proc(outputId: int) =
      loader.resume(outputId)
  else:
    var msg: string
    r.sread(msg) # packet 1
    stream.sclose()
  return response

proc shareCachedItem*(loader: FileLoader; id, targetPid: int; sourcePid = -1) =
  let stream = loader.connect()
  if stream != nil:
    let sourcePid = if sourcePid != -1: sourcePid else: loader.clientPid
    stream.withLoaderPacketWriter loader, w:
      w.swrite(lcShareCachedItem)
      w.swrite(sourcePid)
      w.swrite(targetPid)
      w.swrite(id)
    stream.sclose()

proc openCachedItem*(loader: FileLoader; cacheId: int): PosixStream =
  let stream = loader.connect()
  if stream != nil:
    stream.withLoaderPacketWriter loader, w:
      w.swrite(lcOpenCachedItem)
      w.swrite(cacheId)
    var fd = cint(-1)
    stream.withPacketReader r:
      var success: bool
      r.sread(success)
      if success:
        fd = r.recvAux.pop()
    stream.sclose()
    if fd != -1:
      return newPosixStream(fd)
  return nil

proc passFd*(loader: FileLoader; id: string; fd: cint) =
  let stream = loader.connect()
  if stream != nil:
    stream.withLoaderPacketWriter loader, w:
      w.swrite(lcPassFd)
      w.swrite(id)
    stream.sendFd(fd)
    stream.sclose()

proc removeCachedItem*(loader: FileLoader; cacheId: int) =
  let stream = loader.connect()
  if stream != nil:
    stream.withLoaderPacketWriter loader, w:
      w.swrite(lcRemoveCachedItem)
      w.swrite(cacheId)
    stream.sclose()

proc addClient*(loader: FileLoader; key: ClientKey; pid: int;
    config: LoaderClientConfig; clonedFrom: int): bool =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcAddClient)
    w.swrite(key)
    w.swrite(pid)
    w.swrite(config)
    w.swrite(clonedFrom)
  var r = stream.initPacketReader()
  var res: bool
  r.sread(res)
  stream.sclose()
  return res

proc removeClient*(loader: FileLoader; pid: int) =
  let stream = loader.connect()
  if stream != nil:
    stream.withLoaderPacketWriter loader, w:
      w.swrite(lcRemoveClient)
      w.swrite(pid)
    stream.sclose()

when defined(freebsd):
  let O_DIRECTORY* {.importc, header: "<fcntl.h>", noinit.}: cint

proc setSocketDir*(loader: FileLoader; path: string) =
  loader.sockDir = path
  when defined(freebsd):
    loader.sockDirFd = newPosixStream(path, O_DIRECTORY, 0).fd
  else:
    loader.sockDirFd = -1