about summary refs log tree commit diff stats
path: root/src/loader/loader.nim
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2024-09-14 15:58:52 +0200
committerbptato <nincsnevem662@gmail.com>2024-09-14 16:56:46 +0200
commit5b2a36579e53c69f154288a91ddc3e7c5375d7a6 (patch)
tree6b81b21aa15ef26031ce653ea972f75e00b1df0e /src/loader/loader.nim
parentdfa1a4bc5ece3b8c333b9a47bab038ff6a162f5b (diff)
downloadchawan-5b2a36579e53c69f154288a91ddc3e7c5375d7a6.tar.gz
loader: refactor, misc optimizations & fixes
* factor out input/output handle tables; use a seq instead
* add possibility to directly open cached items onto stdin (mainly an
  optimization for reading images, which are always cached)
* close used handles on local CGI execution
* make clone during load work again
Diffstat (limited to 'src/loader/loader.nim')
-rw-r--r--src/loader/loader.nim281
1 files changed, 180 insertions, 101 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index c882c227..0816c6f8 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -60,8 +60,8 @@ type
     key*: ClientKey
     process*: int
     clientPid*: int
-    connecting*: Table[int, ConnectData]
-    ongoing*: Table[int, Response]
+    map*: seq[LoaderData]
+    mapFds*: int # number of fds in map
     unregistered*: seq[int]
     registerFun*: proc(fd: int)
     unregisterFun*: proc(fd: int)
@@ -73,16 +73,21 @@ type
   ConnectDataState = enum
     cdsBeforeResult, cdsBeforeStatus, cdsBeforeHeaders
 
-  ConnectData = ref object
+  LoaderData = ref object of RootObj
+    stream*: SocketStream
+
+  ConnectData* = ref object of LoaderData
     state: ConnectDataState
     status: uint16
     res: int
     outputId: int
     redirectNum: int
     promise: Promise[JSResult[Response]]
-    stream*: SocketStream
     request: Request
 
+  OngoingData* = ref object of LoaderData
+    response*: Response
+
   LoaderCommand = enum
     lcAddCacheFile
     lcAddClient
@@ -119,8 +124,7 @@ type
     ssock: ServerSocket
     alive: bool
     config: LoaderConfig
-    handleMap: Table[int, LoaderHandle]
-    outputMap: Table[int, OutputHandle]
+    handleMap: seq[LoaderHandle]
     selector: Selector[int]
     # List of existing clients (buffer or pager) that may make requests.
     clientData: Table[int, ClientData] # pid -> data
@@ -158,22 +162,32 @@ func canRewriteForCGICompat(ctx: LoaderContext; path: string): bool =
       return true
   return false
 
-proc rejectHandle(handle: LoaderHandle; code: ConnectErrorCode; msg = "") =
+proc rejectHandle(handle: InputHandle; code: ConnectErrorCode; msg = "") =
   handle.sendResult(code, msg)
   handle.close()
 
+iterator inputHandles(ctx: LoaderContext): InputHandle {.inline.} =
+  for it in ctx.handleMap:
+    if it != nil and it of InputHandle:
+      yield InputHandle(it)
+
+iterator outputHandles(ctx: LoaderContext): OutputHandle {.inline.} =
+  for it in ctx.handleMap:
+    if it != nil and it of OutputHandle:
+      yield OutputHandle(it)
+
 func findOutput(ctx: LoaderContext; id: int; client: ClientData): OutputHandle =
   assert id != -1
-  for it in ctx.outputMap.values:
+  for it in ctx.outputHandles:
     if it.outputId == id:
       # verify that it's safe to access this handle.
       doAssert ctx.isPrivileged(client) or client.pid == it.ownerPid
       return it
   return nil
 
-func findCachedHandle(ctx: LoaderContext; cacheId: int): LoaderHandle =
+func findCachedHandle(ctx: LoaderContext; cacheId: int): InputHandle =
   assert cacheId != -1
-  for it in ctx.handleMap.values:
+  for it in ctx.inputHandles:
     if it.cacheId == cacheId:
       return it
   return nil
@@ -181,19 +195,19 @@ func findCachedHandle(ctx: LoaderContext; cacheId: int): LoaderHandle =
 type PushBufferResult = enum
   pbrDone, pbrUnregister
 
-proc register(ctx: LoaderContext; handle: LoaderHandle) =
+proc register(ctx: LoaderContext; handle: InputHandle) =
   assert not handle.registered
-  ctx.selector.registerHandle(int(handle.istream.fd), {Read}, 0)
+  ctx.selector.registerHandle(int(handle.stream.fd), {Read}, 0)
   handle.registered = true
 
-proc unregister(ctx: LoaderContext; handle: LoaderHandle) =
+proc unregister(ctx: LoaderContext; handle: InputHandle) =
   assert handle.registered
-  ctx.selector.unregister(int(handle.istream.fd))
+  ctx.selector.unregister(int(handle.stream.fd))
   handle.registered = false
 
 proc register(ctx: LoaderContext; output: OutputHandle) =
   assert not output.registered
-  ctx.selector.registerHandle(int(output.ostream.fd), {Write}, 0)
+  ctx.selector.registerHandle(int(output.stream.fd), {Write}, 0)
   output.registered = true
 
 const bsdPlatform = defined(macosx) or defined(freebsd) or defined(netbsd) or
@@ -202,7 +216,7 @@ proc unregister(ctx: LoaderContext; output: OutputHandle) =
   assert output.registered
   # so kqueue-based selectors raise when we try to unregister a pipe whose
   # reader is at EOF. "solution": clean up this mess ourselves.
-  let fd = int(output.ostream.fd)
+  let fd = int(output.stream.fd)
   when bsdPlatform:
     let oc = ctx.selector.count
     try:
@@ -240,7 +254,7 @@ proc pushBuffer(ctx: LoaderContext; output: OutputHandle; buffer: LoaderBuffer;
   elif output.currentBuffer == nil:
     var n = si
     try:
-      n += output.ostream.sendData(buffer, si)
+      n += output.stream.sendData(buffer, si)
     except ErrorAgain:
       discard
     except ErrorBrokenPipe:
@@ -282,7 +296,7 @@ proc redirectToFile(ctx: LoaderContext; output: OutputHandle;
   elif output.parent != nil:
     output.parent.outputs.add(OutputHandle(
       parent: output.parent,
-      ostream: ps,
+      stream: ps,
       istreamAtEnd: output.istreamAtEnd,
       outputId: ctx.getOutputId()
     ))
@@ -302,28 +316,38 @@ proc addCacheFile(ctx: LoaderContext; client: ClientData; output: OutputHandle):
     return cacheId
   return -1
 
-proc addFd(ctx: LoaderContext; handle: LoaderHandle) =
+proc put(ctx: LoaderContext; handle: LoaderHandle) =
+  let fd = int(handle.stream.fd)
+  if ctx.handleMap.len <= fd:
+    ctx.handleMap.setLen(fd + 1)
+  assert ctx.handleMap[fd] == nil
+  ctx.handleMap[fd] = handle
+
+proc unset(ctx: LoaderContext; handle: LoaderHandle) =
+  let fd = int(handle.stream.fd)
+  if fd < ctx.handleMap.len:
+    ctx.handleMap[fd] = nil
+
+proc addFd(ctx: LoaderContext; handle: InputHandle) =
   let output = handle.output
-  output.ostream.setBlocking(false)
-  handle.istream.setBlocking(false)
+  output.stream.setBlocking(false)
+  handle.stream.setBlocking(false)
   ctx.register(handle)
-  assert handle.istream.fd notin ctx.handleMap
-  assert output.ostream.fd notin ctx.outputMap
-  ctx.handleMap[handle.istream.fd] = handle
-  ctx.outputMap[output.ostream.fd] = output
+  ctx.put(handle)
+  ctx.put(output)
 
 type HandleReadResult = enum
   hrrDone, hrrUnregister, hrrBrokenPipe
 
 # Called whenever there is more data available to read.
-proc handleRead(ctx: LoaderContext; handle: LoaderHandle;
+proc handleRead(ctx: LoaderContext; handle: InputHandle;
     unregWrite: var seq[OutputHandle]): HandleReadResult =
   var unregs = 0
   let maxUnregs = handle.outputs.len
   while true:
     let buffer = newLoaderBuffer()
     try:
-      let n = handle.istream.recvData(buffer)
+      let n = handle.stream.recvData(buffer)
       if n == 0: # EOF
         return hrrUnregister
       var si = 0
@@ -356,9 +380,9 @@ proc handleRead(ctx: LoaderContext; handle: LoaderHandle;
 
 # stream is a regular file, so we can't select on it.
 # cachedHandle is used for attaching the output handle to a different
-# LoaderHandle when loadFromCache is called while a download is still ongoing
+# InputHandle when loadFromCache is called while a download is still ongoing
 # (and thus some parts of the document are not cached yet).
-proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) =
+proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: InputHandle) =
   assert handle.parser == nil # parser is only used with CGI
   var unregWrite: seq[OutputHandle] = @[]
   let r = ctx.handleRead(handle, unregWrite)
@@ -374,18 +398,19 @@ proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) =
     elif cachedHandle != nil:
       output.parent = cachedHandle
       cachedHandle.outputs.add(output)
-      ctx.outputMap[output.ostream.fd] = output
+      ctx.put(output)
     elif output.registered or output.suspended:
       output.parent = nil
       output.istreamAtEnd = true
-      ctx.outputMap[output.ostream.fd] = output
+      ctx.put(output)
     else:
-      assert output.ostream.fd notin ctx.outputMap
+      assert output.stream.fd < ctx.handleMap.len or
+        ctx.handleMap[output.stream.fd] == nil
       output.oclose()
   handle.outputs.setLen(0)
   handle.iclose()
 
-proc loadStream(ctx: LoaderContext; client: ClientData; handle: LoaderHandle;
+proc loadStream(ctx: LoaderContext; client: ClientData; handle: InputHandle;
     request: Request) =
   client.passedFdMap.withValue(request.url.pathname, fdp):
     handle.sendResult(0)
@@ -394,12 +419,12 @@ proc loadStream(ctx: LoaderContext; client: ClientData; handle: LoaderHandle;
     let ps = newPosixStream(fdp[])
     var stats: Stat
     doAssert fstat(fdp[], stats) != -1
-    handle.istream = ps
+    handle.stream = ps
     client.passedFdMap.del(request.url.pathname)
     if S_ISCHR(stats.st_mode) or S_ISREG(stats.st_mode):
       # regular file: e.g. cha <file
       # or character device: e.g. cha </dev/null
-      handle.output.ostream.setBlocking(false)
+      handle.output.stream.setBlocking(false)
       # not loading from cache, so cachedHandle is nil
       ctx.loadStreamRegular(handle, nil)
   do:
@@ -411,19 +436,24 @@ func find(cacheMap: seq[CachedItem]; id: int): int =
       return i
   -1
 
-proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: LoaderHandle;
+proc openCachedItem(client: ClientData; id: int): (PosixStream, int) =
+  let n = client.cacheMap.find(id)
+  if n != -1:
+    return (newPosixStream(client.cacheMap[n].path, O_RDONLY, 0), n)
+  return (nil, -1)
+
+proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: InputHandle;
     request: Request) =
   let id = parseInt32(request.url.pathname).get(-1)
   let startFrom = if request.url.query.isSome:
     parseInt32(request.url.query.get).get(0)
   else:
     0
-  let n = client.cacheMap.find(id)
-  if n != -1:
-    let ps = newPosixStream(client.cacheMap[n].path, O_RDONLY, 0)
+  let (ps, n) = client.openCachedItem(id)
+  if ps != nil:
     if startFrom != 0:
       ps.seek(startFrom)
-    handle.istream = ps
+    handle.stream = ps
     if ps == nil:
       handle.rejectHandle(ERROR_FILE_NOT_IN_CACHE)
       client.cacheMap.del(n)
@@ -431,7 +461,7 @@ proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: LoaderHandle;
     handle.sendResult(0)
     handle.sendStatus(200)
     handle.sendHeaders(newHeaders())
-    handle.output.ostream.setBlocking(false)
+    handle.output.stream.setBlocking(false)
     let cachedHandle = ctx.findCachedHandle(id)
     ctx.loadStreamRegular(handle, cachedHandle)
   else:
@@ -440,7 +470,7 @@ proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: LoaderHandle;
 # Data URL handler.
 # Moved back into loader from CGI, because data URLs can get extremely long
 # and thus no longer fit into the environment.
-proc loadDataSend(ctx: LoaderContext; handle: LoaderHandle; s, ct: string) =
+proc loadDataSend(ctx: LoaderContext; handle: InputHandle; s, ct: string) =
   handle.sendResult(0)
   handle.sendStatus(200)
   handle.sendHeaders(newHeaders({"Content-Type": ct}))
@@ -448,7 +478,7 @@ proc loadDataSend(ctx: LoaderContext; handle: LoaderHandle; s, ct: string) =
   if s.len == 0:
     if output.suspended:
       output.istreamAtEnd = true
-      ctx.outputMap[output.ostream.fd] = output
+      ctx.put(output)
     else:
       output.oclose()
     return
@@ -463,11 +493,11 @@ proc loadDataSend(ctx: LoaderContext; handle: LoaderHandle; s, ct: string) =
   of pbrDone:
     if output.registered or output.suspended:
       output.istreamAtEnd = true
-      ctx.outputMap[output.ostream.fd] = output
+      ctx.put(output)
     else:
       output.oclose()
 
-proc loadData(ctx: LoaderContext; handle: LoaderHandle; request: Request) =
+proc loadData(ctx: LoaderContext; handle: InputHandle; request: Request) =
   let url = request.url
   var ct = url.path.s.until(',')
   if AllChars - Ascii + Controls - {'\t', ' '} in ct:
@@ -488,7 +518,7 @@ proc loadData(ctx: LoaderContext; handle: LoaderHandle; request: Request) =
     ctx.loadDataSend(handle, body, ct)
 
 proc loadResource(ctx: LoaderContext; client: ClientData;
-    config: LoaderClientConfig; request: Request; handle: LoaderHandle) =
+    config: LoaderClientConfig; request: Request; handle: InputHandle) =
   var redo = true
   var tries = 0
   var prevurl: URL = nil
@@ -504,16 +534,20 @@ proc loadResource(ctx: LoaderContext; client: ClientData;
           redo = true
           continue
     if request.url.scheme == "cgi-bin":
-      var ostream: PosixStream = nil
+      var istream: PosixStream = nil # for rbtCache
+      var ostream: PosixStream = nil # for rbtOutput
+      if request.body.t == rbtCache:
+        var n: int
+        (istream, n) = client.openCachedItem(request.body.cacheId)
       handle.loadCGI(request, ctx.config.cgiDir, prevurl,
-        config.insecureSSLNoVerify, ostream)
-      if handle.istream != nil:
+        config.insecureSSLNoVerify, ctx.handleMap, istream, ostream)
+      if handle.stream != nil:
         if ostream != nil:
           let outputIn = ctx.findOutput(request.body.outputId, client)
           if outputIn != nil:
             ostream.setBlocking(false)
             let output = outputIn.tee(ostream, ctx.getOutputId(), client.pid)
-            ctx.outputMap[ostream.fd] = output
+            ctx.put(output)
             output.suspended = false
             if not output.isEmpty:
               ctx.register(output)
@@ -525,13 +559,13 @@ proc loadResource(ctx: LoaderContext; client: ClientData;
         handle.close()
     elif request.url.scheme == "stream":
       ctx.loadStream(client, handle, request)
-      if handle.istream != nil:
+      if handle.stream != nil:
         ctx.addFd(handle)
       else:
         handle.close()
     elif request.url.scheme == "cache":
       ctx.loadFromCache(client, handle, request)
-      assert handle.istream == nil
+      assert handle.stream == nil
       handle.close()
     elif request.url.scheme == "data":
       ctx.loadData(handle, request)
@@ -564,7 +598,7 @@ proc setupRequestDefaults(request: Request; config: LoaderClientConfig) =
 
 proc load(ctx: LoaderContext; stream: SocketStream; request: Request;
     client: ClientData; config: LoaderClientConfig) =
-  let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid)
+  let handle = newInputHandle(stream, ctx.getOutputId(), client.pid)
   when defined(debug):
     handle.url = request.url
     handle.output.url = request.url
@@ -720,7 +754,7 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData;
   if outputIn != nil:
     let id = ctx.getOutputId()
     let output = outputIn.tee(stream, id, targetPid)
-    ctx.outputMap[output.ostream.fd] = output
+    ctx.put(output)
     stream.withPacketWriter w:
       w.swrite(id)
     stream.setBlocking(false)
@@ -821,7 +855,7 @@ proc acceptConnection(ctx: LoaderContext) =
         ctx.resume(stream, client, r)
   except ErrorBrokenPipe:
     # receiving end died while reading the file; give up.
-    assert stream.fd notin ctx.outputMap
+    assert stream.fd >= ctx.handleMap.len or ctx.handleMap[stream.fd] == nil
     stream.sclose()
 
 proc exitLoader(ctx: LoaderContext) =
@@ -843,6 +877,9 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext =
   ctx.ssock = initServerSocket(config.sockdir, -1, myPid, blocking = true)
   let sfd = int(ctx.ssock.sock.getFd())
   ctx.selector.registerHandle(sfd, {Read}, 0)
+  if ctx.handleMap.len <= sfd:
+    ctx.handleMap.setLen(sfd + 1)
+  ctx.handleMap[sfd] = LoaderHandle() # pseudo handle
   # The server has been initialized, so the main process can resume execution.
   let ps = newPosixStream(fd)
   ps.write(char(0u8))
@@ -897,7 +934,7 @@ proc handleWrite(ctx: LoaderContext; output: OutputHandle;
   while output.currentBuffer != nil:
     let buffer = output.currentBuffer
     try:
-      let n = output.ostream.sendData(buffer, output.currentBufferIdx)
+      let n = output.stream.sendData(buffer, output.currentBufferIdx)
       output.currentBufferIdx += n
       if output.currentBufferIdx < buffer.len:
         break
@@ -915,16 +952,16 @@ proc handleWrite(ctx: LoaderContext; output: OutputHandle;
       # all buffers sent, no need to select on this output again for now
       ctx.unregister(output)
 
-proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle];
+proc finishCycle(ctx: LoaderContext; unregRead: var seq[InputHandle];
     unregWrite: var seq[OutputHandle]) =
   # Unregister handles queued for unregistration.
   # It is possible for both unregRead and unregWrite to contain duplicates. To
   # avoid double-close/double-unregister, we set the istream/ostream of
   # unregistered handles to nil.
   for handle in unregRead:
-    if handle.istream != nil:
+    if handle.stream != nil:
       ctx.unregister(handle)
-      ctx.handleMap.del(handle.istream.fd)
+      ctx.unset(handle)
       if handle.parser != nil:
         handle.finishParse()
       handle.iclose()
@@ -933,19 +970,19 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle];
         if output.isEmpty:
           unregWrite.add(output)
   for output in unregWrite:
-    if output.ostream != nil:
+    if output.stream != nil:
       if output.registered:
         ctx.unregister(output)
-      ctx.outputMap.del(output.ostream.fd)
+      ctx.unset(output)
       output.oclose()
       let handle = output.parent
       if handle != nil: # may be nil if from loadStream S_ISREG
         let i = handle.outputs.find(output)
         handle.outputs.del(i)
-        if handle.outputs.len == 0 and handle.istream != nil:
+        if handle.outputs.len == 0 and handle.stream != nil:
           # premature end of all output streams; kill istream too
           ctx.unregister(handle)
-          ctx.handleMap.del(handle.istream.fd)
+          ctx.unset(handle)
           if handle.parser != nil:
             handle.finishParse()
           handle.iclose()
@@ -956,26 +993,26 @@ proc runFileLoader*(fd: cint; config: LoaderConfig) =
   var keys: array[64, ReadyKey]
   while ctx.alive:
     let count = ctx.selector.selectInto(-1, keys)
-    var unregRead: seq[LoaderHandle] = @[]
+    var unregRead: seq[InputHandle] = @[]
     var unregWrite: seq[OutputHandle] = @[]
     for event in keys.toOpenArray(0, count - 1):
+      let handle = ctx.handleMap[event.fd]
       if Read in event.events:
         if event.fd == fd: # incoming connection
           ctx.acceptConnection()
         else:
-          let handle = ctx.handleMap[event.fd]
+          let handle = InputHandle(ctx.handleMap[event.fd])
           case ctx.handleRead(handle, unregWrite)
           of hrrDone: discard
           of hrrUnregister, hrrBrokenPipe: unregRead.add(handle)
       if Write in event.events:
-        ctx.handleWrite(ctx.outputMap[event.fd], unregWrite)
+        ctx.handleWrite(OutputHandle(handle), unregWrite)
       if Error in event.events:
         assert event.fd != fd
-        ctx.outputMap.withValue(event.fd, outputp): # ostream died
-          unregWrite.add(outputp[])
-        do: # istream died
-          let handle = ctx.handleMap[event.fd]
-          unregRead.add(handle)
+        if handle of InputHandle: # istream died
+          unregRead.add(InputHandle(handle))
+        else: # ostream died
+          unregWrite.add(OutputHandle(handle))
     ctx.finishCycle(unregRead, unregWrite)
   ctx.exitLoader()
 
@@ -1023,17 +1060,43 @@ proc startRequest*(loader: FileLoader; request: Request;
     w.swrite(config)
   return stream
 
+iterator ongoing*(loader: FileLoader): OngoingData =
+  for it in loader.map:
+    if it != nil and it of OngoingData:
+      yield OngoingData(it)
+
+func fd*(data: LoaderData): int =
+  return int(data.stream.fd)
+
+proc put*(loader: FileLoader; data: LoaderData) =
+  let fd = int(data.stream.fd)
+  if loader.map.len <= fd:
+    loader.map.setLen(fd + 1)
+  assert loader.map[fd] == nil
+  loader.map[fd] = data
+  inc loader.mapFds
+
+proc get*(loader: FileLoader; fd: int): LoaderData =
+  if fd < loader.map.len:
+    return loader.map[fd]
+  return nil
+
+proc unset*(loader: FileLoader; data: LoaderData) =
+  let fd = int(data.stream.fd)
+  if loader.get(fd) != nil:
+    dec loader.mapFds
+    loader.map[fd] = nil
+
 proc fetch0(loader: FileLoader; input: Request; promise: FetchPromise;
     redirectNum: int) =
   let stream = loader.startRequest(input)
-  let fd = int(stream.fd)
-  loader.registerFun(fd)
-  loader.connecting[fd] = ConnectData(
+  loader.registerFun(int(stream.fd))
+  loader.put(ConnectData(
     promise: promise,
     request: input,
     stream: stream,
     redirectNum: redirectNum
-  )
+  ))
 
 proc fetch*(loader: FileLoader; input: Request): FetchPromise =
   let promise = FetchPromise()
@@ -1043,13 +1106,13 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise =
 proc reconnect*(loader: FileLoader; data: ConnectData) =
   data.stream.sclose()
   let stream = loader.startRequest(data.request)
-  let fd = int(stream.fd)
-  loader.registerFun(fd)
-  loader.connecting[fd] = ConnectData(
+  let data = ConnectData(
     promise: data.promise,
     request: data.request,
     stream: stream
   )
+  loader.put(data)
+  loader.registerFun(data.fd)
 
 proc suspend*(loader: FileLoader; fds: seq[int]) =
   let stream = loader.connect()
@@ -1121,8 +1184,7 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string):
   stream.sclose()
   return res
 
-proc onConnected*(loader: FileLoader; fd: int) =
-  let connectData = loader.connecting[fd]
+proc onConnected(loader: FileLoader; connectData: ConnectData) =
   let stream = connectData.stream
   let promise = connectData.promise
   let request = connectData.request
@@ -1139,11 +1201,12 @@ proc onConnected*(loader: FileLoader; fd: int) =
       # msg is discarded.
       #TODO maybe print if called from trusted code (i.e. global == client)?
       r.sread(msg) # packet 1
+      let fd = connectData.fd
       loader.unregisterFun(fd)
       loader.unregistered.add(fd)
       stream.sclose()
       # delete before resolving the promise
-      loader.connecting.del(fd)
+      loader.unset(connectData)
       let err = newTypeError("NetworkError when attempting to fetch resource")
       promise.resolve(JSResult[Response].err(err))
   of cdsBeforeStatus:
@@ -1155,18 +1218,20 @@ proc onConnected*(loader: FileLoader; fd: int) =
     r.sread(response.headers) # packet 3
     # Only a stream of the response body may arrive after this point.
     response.body = stream
+    # delete before resolving the promise
+    loader.unset(connectData)
+    let data = OngoingData(response: response, stream: stream)
+    loader.put(data)
     assert loader.unregisterFun != nil
     response.unregisterFun = proc() =
-      loader.ongoing.del(response.body.fd)
-      loader.unregistered.add(response.body.fd)
-      loader.unregisterFun(response.body.fd)
+      loader.unset(data)
+      let fd = data.fd
+      loader.unregistered.add(fd)
+      loader.unregisterFun(fd)
     response.resumeFun = proc(outputId: int) =
       loader.resume(outputId)
-    loader.ongoing[fd] = response
     stream.setBlocking(false)
     let redirect = response.getRedirect(request)
-    # delete before resolving the promise
-    loader.connecting.del(fd)
     if redirect != nil:
       response.unregisterFun()
       stream.sclose()
@@ -1179,24 +1244,38 @@ proc onConnected*(loader: FileLoader; fd: int) =
     else:
       promise.resolve(JSResult[Response].ok(response))
 
-proc onRead*(loader: FileLoader; fd: int) =
-  let response = loader.ongoing.getOrDefault(fd)
-  if response != nil:
-    response.onRead(response)
-    if response.body.isend:
-      if response.onFinish != nil:
-        response.onFinish(response, true)
-      response.onFinish = nil
-      response.close()
-
-proc onError*(loader: FileLoader; fd: int) =
-  let response = loader.ongoing.getOrDefault(fd)
-  if response != nil:
+proc onRead*(loader: FileLoader; data: OngoingData) =
+  let response = data.response
+  response.onRead(response)
+  if response.body.isend:
     if response.onFinish != nil:
-      response.onFinish(response, false)
+      response.onFinish(response, true)
     response.onFinish = nil
     response.close()
 
+proc onRead*(loader: FileLoader; fd: int) =
+  let data = loader.map[fd]
+  if data of ConnectData:
+    loader.onConnected(ConnectData(data))
+  else:
+    loader.onRead(OngoingData(data))
+
+proc onError*(loader: FileLoader; data: OngoingData) =
+  let response = data.response
+  if response.onFinish != nil:
+    response.onFinish(response, false)
+  response.onFinish = nil
+  response.close()
+
+proc onError*(loader: FileLoader; fd: int): bool =
+  let data = loader.map[fd]
+  if data of ConnectData:
+    # probably shouldn't happen. TODO
+    return false
+  else:
+    loader.onError(OngoingData(data))
+    return true
+
 # Note: this blocks until headers are received.
 proc doRequest*(loader: FileLoader; request: Request): Response =
   let stream = loader.startRequest(request)