about summary refs log tree commit diff stats
path: root/src/loader/loaderhandle.nim
blob: b1b04dee9af6876b2904e69a32957e36062f190b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import std/deques
import std/net
import std/tables

import io/bufwriter
import io/dynstream
import io/posixstream
import loader/headers

when defined(debug):
  import types/url

const LoaderBufferPageSize = 4064 # 4096 - 32

type
  LoaderBufferObj = object
    page*: ptr UncheckedArray[uint8]
    len*: int

  LoaderBuffer* = ref LoaderBufferObj

  OutputHandle* = ref object
    parent*: LoaderHandle
    currentBuffer*: LoaderBuffer
    currentBufferIdx*: int
    buffers*: Deque[LoaderBuffer]
    ostream*: PosixStream
    istreamAtEnd*: bool
    ownerPid*: int
    outputId*: int
    registered*: bool
    suspended*: bool
    dead*: bool
    when defined(debug):
      url*: URL

  HandleParserState* = enum
    hpsBeforeLines, hpsAfterFirstLine, hpsControlDone

  HeaderParser* = ref object
    state*: HandleParserState
    lineBuffer*: string
    crSeen*: bool
    headers*: Headers
    status*: uint16

  ResponseState = enum
    rsBeforeResult, rsAfterFailure, rsBeforeStatus, rsBeforeHeaders,
    rsAfterHeaders

  LoaderHandle* = ref object
    istream*: PosixStream # stream for taking input
    outputs*: seq[OutputHandle] # list of outputs to be streamed into
    cacheId*: int # if cached, our ID in a client cacheMap
    parser*: HeaderParser # only exists for CGI handles
    rstate: ResponseState # track response state
    when defined(debug):
      url*: URL

{.warning[Deprecated]:off.}:
  proc `=destroy`(buffer: var LoaderBufferObj) =
    if buffer.page != nil:
      dealloc(buffer.page)
      buffer.page = nil

# for debugging
when defined(debug):
  func `$`*(buffer: LoaderBuffer): string =
    var s = newString(buffer.len)
    copyMem(addr s[0], addr buffer.page[0], buffer.len)
    return s

# Create a new loader handle, with the output stream ostream.
proc newLoaderHandle*(ostream: PosixStream; outputId, pid: int): LoaderHandle =
  let handle = LoaderHandle(cacheId: -1)
  handle.outputs.add(OutputHandle(
    ostream: ostream,
    parent: handle,
    outputId: outputId,
    ownerPid: pid,
    suspended: true
  ))
  return handle

proc findOutputHandle*(handle: LoaderHandle; fd: int): OutputHandle =
  for output in handle.outputs:
    if output.ostream.fd == fd:
      return output
  return nil

func cap*(buffer: LoaderBuffer): int {.inline.} =
  return LoaderBufferPageSize

template isEmpty*(output: OutputHandle): bool =
  output.currentBuffer == nil and not output.suspended

proc newLoaderBuffer*(): LoaderBuffer =
  return LoaderBuffer(
    page: cast[ptr UncheckedArray[uint8]](alloc(LoaderBufferPageSize)),
    len: 0
  )

proc bufferCleared*(output: OutputHandle) =
  assert output.currentBuffer != nil
  output.currentBufferIdx = 0
  if output.buffers.len > 0:
    output.currentBuffer = output.buffers.popFirst()
  else:
    output.currentBuffer = nil

proc tee*(outputIn: OutputHandle; ostream: PosixStream; outputId, pid: int):
    OutputHandle =
  assert outputIn.suspended
  let output = OutputHandle(
    parent: outputIn.parent,
    ostream: ostream,
    currentBuffer: outputIn.currentBuffer,
    currentBufferIdx: outputIn.currentBufferIdx,
    buffers: outputIn.buffers,
    istreamAtEnd: outputIn.istreamAtEnd,
    outputId: outputId,
    ownerPid: pid,
    suspended: outputIn.suspended
  )
  when defined(debug):
    output.url = outputIn.url
  if outputIn.parent != nil:
    assert outputIn.parent.parser == nil
    outputIn.parent.outputs.add(output)
  return output

template output*(handle: LoaderHandle): OutputHandle =
  handle.outputs[0]

proc sendResult*(handle: LoaderHandle; res: int; msg = "") =
  assert handle.rstate == rsBeforeResult
  inc handle.rstate
  let output = handle.output
  let blocking = output.ostream.blocking
  output.ostream.setBlocking(true)
  output.ostream.withPacketWriter w:
    w.swrite(res)
    if res == 0: # success
      assert msg == ""
      w.swrite(output.outputId)
      inc handle.rstate
    else: # error
      w.swrite(msg)
  output.ostream.setBlocking(blocking)

proc sendStatus*(handle: LoaderHandle; status: uint16) =
  assert handle.rstate == rsBeforeStatus
  inc handle.rstate
  let blocking = handle.output.ostream.blocking
  handle.output.ostream.setBlocking(true)
  handle.output.ostream.withPacketWriter w:
    w.swrite(status)
  handle.output.ostream.setBlocking(blocking)

proc sendHeaders*(handle: LoaderHandle; headers: Headers) =
  assert handle.rstate == rsBeforeHeaders
  inc handle.rstate
  let blocking = handle.output.ostream.blocking
  handle.output.ostream.setBlocking(true)
  handle.output.ostream.withPacketWriter w:
    w.swrite(headers)
  handle.output.ostream.setBlocking(blocking)

proc recvData*(ps: PosixStream; buffer: LoaderBuffer): int {.inline.} =
  let n = ps.recvData(addr buffer.page[0], buffer.cap)
  buffer.len = n
  return n

proc sendData*(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} =
  assert buffer.len - si > 0
  return ps.sendData(addr buffer.page[si], buffer.len - si)

proc iclose*(handle: LoaderHandle) =
  if handle.istream != nil:
    if handle.rstate notin {rsBeforeResult, rsAfterFailure, rsAfterHeaders}:
      assert handle.outputs.len == 1
      # not an ideal solution, but better than silently eating malformed
      # headers
      try:
        if handle.rstate == rsBeforeStatus:
          handle.sendStatus(500)
        if handle.rstate == rsBeforeHeaders:
          handle.sendHeaders(newHeaders())
        handle.output.ostream.setBlocking(true)
        const msg = "Error: malformed header in CGI script"
        discard handle.output.ostream.sendData(msg)
      except ErrorBrokenPipe:
        discard # receiver is dead
    handle.istream.sclose()
    handle.istream = nil

proc oclose*(output: OutputHandle) =
  output.ostream.sclose()
  output.ostream = nil

proc close*(handle: LoaderHandle) =
  handle.iclose()
  for output in handle.outputs:
    assert not output.registered
    if output.ostream != nil:
      output.oclose()