about summary refs log blame commit diff stats
path: root/src/loader/loader.nim
blob: 91212e2413a9a5fd8d2863da5eabda554170683b (plain) (tree)
1
2
3
4
5
6
7
8
9
10






                                                                               
               

                 
            
                   

                   

                                                                               




                                                                         
 
                 


                        
             

                    

                   
 
                   
                   
                   
                 
                      
                  
                   
                 
                          
                     


                          

                          
                   
                
                     
                         
                
                   
 


               
    
                          

                   
                   
                                        
                                  
                           

                                 



                                                                            
 







                                                      
                    
                                        
                         
                    

                      

                  
                  
          
                
            
                    








                               
                         



                


                         

                              
 
                            
                           

                       
                        
                                       
                                       
                           
                                                   




                                                                        
 
                        
                        
                               
                       
                   
                    
 




                              
                                   
                              
 
                                             
 


                                                                 

                                                                       
 
                                                                     






                                                               
                                                                           


                              
                                                                                
                 
                                 
                         

                                                                    


               

                                                                       
                                 
                             
               

            


                            

                                                         
                                                                 



                                                           
                                                 

                           

                                                                                

















                                                                                
        
                                              
                      





                                   
                          
       

                                  
 



                                           




                                                                 











                                                                          
                          

                


                            





                                           

             
                                                                                
         

                                                             
                                





                                                                     

                  
 
                                                      

                                   
                                   
                                                                

                                              
                                           
                                           

                            
                                       

                                                       
                                                         
                                                          

                                    



                                             

                            


                                        
                                                             


                                                                               
                                   


                                                
                                               

                            
                                




                                                   




                                                         
                          





                                                                             

                                                                                





                                            
                            
                         
                               
                          
                     



                                               
                                               




                                                  
                     
                          
                 
 
                                                                             
                                                       


                                    



                                      
                                             


                                                        
                                              

                                                      
     
                                                               
 







                                                                                




                                                   


















                                                                 










































                                                                            

                                                                         

                 
                        

                                     
                                                                







                                                                   
                                       
                                    
                                                         
                                            
                               
                          






                                                                             

                            
                         
           
                             

                                        
                                     
                               
                         

                      



                                                

                                      
         
                           
                                                              
                     

                   
                      
                                                           
                      
                                                 
                          
                                                
 
                                                                         












                                                                            

                                                                     
                                                                     

                            
                                   
                                          
                                             
       

















                                                                             
 










                                                                               

                                                        


                                
                     


                 
                     
                            


                                                          






                                                                 
                    
                 






                                      

                                                           
              
              



                                    
                 
 
                                                                               
                            

                    
                   
                            
                    

                                                              

                                              
                                                 
                            
                
                 
 

                                                             

                        

                     
                                                        


                                                    
                            
                     
                 
 

                                                              




                                                                      


                    





                                              
                 
 
                                                                              
                
             

                                  
                 

                                                               
                                                
             
             






                                        
                 
 

                                                                      

                    

                    

                                                 
                              

                                                    
                              
                  

                             
                              
                  
                   
 

                                                                          
                   
              
                
                                           



                                                        
                              
                 
 

                                                                         
                   
              
                
                                           

                              
                          
                 
 











                                                
                                           

                                             






                                    
                       

                                        
                                                
               
                       



                                   
                                         






                                   








                                      


                                         


                                           

                                           









                                               

                                                         
                                        
                   
 
                                     
                   

                                      
            

                       
                                                                       

                          
                   
                                

            
                                   
                                                          
                                                                          

                                             
                                                                              

                             
             
                   
               
                     


                                      

                                             










                                        
                      




                                  
                              


                                                                    
                   

                                           







                                        
                                           

            

                                                                            
                                                          



                                        
                                                                      



                                              
                                   



                                                           
                    




                                                                        
                            
 
                                                                      






                                                                              
                                                     
                                          

                              
                     

                                   
                          



                                
                              
                                          
                     
                                




                                                                 
                                                         
                                              

                                  
                         

                                                     
                                         
                                      
                  
                                        

                                           

                              
                                                

                                


                                                 
                                                                
                               
                                                            
                               
                             



                                                                  
                               
                                          
                  
 
                                                                  
                                          
                                                                              
                                                          
                                                       
                    




                                                                        
             
                                                                             
            
 







                                                                         

                                                                              
 

                                                                              
                                                                       

                                          

                     

               








                                                        

                                                                      
                                         
                         
                        


                                      

                            
   



                                                               
                
 
                                                        
                      
                                                
                         






                                      

                                                  
                                          

                       
                 
 
                                                       
                               
                                          

                      
                 
 


                                            
                                                                              

                                          


                       
                   

                                   

                           
                                                                       

                               
             
                                          


                            
                                   
                   
                   












                                                              
 




                                                                            
                                          


                              

                                   
 
                                                



                                         
                                   
























                                                                              
                                        

                                                                     

                                      





                                               
                             
                                                

                                         










                                                                                
 
                                           


                                                
                           

                                 
                              
 
                                            

                                                
                        
                                                  
                                 
                                            
                     
                                

                               
                            
 


                                                                 
                                           
                                   
                                  
                       




                                         

                                                                     
       
                   

                           
                 
 
                                                                               
                               
                   
                                                                        
                                            
                                 
                         

                         
                   
 
                                                              
                               
                   
                                            

                        
                             
                   
 
                                                          
                               
                   
                                            

                                  
                   
 
                                                             
                                                        
                               
                                          



                         
                        


                                   


                                                  
                   
                                            

                              
                   
 








                                                                 
# A file loader server (?)
# The idea here is that we receive requests with a socket, then respond to each
# with a response (ideally a document.)
# For now, the protocol looks like:
# C: Request
# S: res (0 => success, _ => error)
# if success:
#  S: output ID
#  S: status code
#  S: headers
#  C: resume
#  S: response body
# else:
#  S: error message
#
# The body is passed to the stream as-is, so effectively nothing can follow it.
#
# Note: if the consumer closes the request's body after headers have been
# passed, it will *not* be cleaned up until a `resume' command is
# received. (This allows for passing outputIds to the pager for later
# addCacheFile commands there.)

import std/deques
import std/nativesockets
import std/net
import std/options
import std/os
import std/posix
import std/selectors
import std/strutils
import std/tables

import io/bufreader
import io/bufwriter
import io/dynstream
import io/promise
import io/serversocket
import io/tempfile
import io/urlfilter
import loader/cgi
import loader/connecterror
import loader/headers
import loader/loaderhandle
import loader/request
import loader/response
import monoucha/javascript
import monoucha/jserror
import types/cookie
import types/opt
import types/referrer
import types/urimethodmap
import types/url
import utils/twtstr

export request
export response

type
  FileLoader* = ref object
    key*: ClientKey
    process*: int
    clientPid*: int
    connecting*: Table[int, ConnectData]
    ongoing*: Table[int, Response]
    unregistered*: seq[int]
    registerFun*: proc(fd: int)
    unregisterFun*: proc(fd: int)
    # directory where we store UNIX domain sockets
    sockDir*: string
    # (FreeBSD only) fd for the socket directory so we can connectat() on it
    sockDirFd*: int

  ConnectDataState = enum
    cdsBeforeResult, cdsBeforeStatus, cdsBeforeHeaders

  ConnectData = ref object
    state: ConnectDataState
    status: uint16
    res: int
    outputId: int
    redirectNum: int
    promise: Promise[JSResult[Response]]
    stream*: SocketStream
    request: Request

  LoaderCommand = enum
    lcAddCacheFile
    lcAddClient
    lcGetCacheFile
    lcLoad
    lcLoadConfig
    lcPassFd
    lcRedirectToFile
    lcRemoveCachedItem
    lcRemoveClient
    lcResume
    lcShareCachedItem
    lcSuspend
    lcTee

  ClientKey* = array[32, uint8]

  CachedItem = ref object
    id: int
    path: string
    refc: int

  ClientData = ref object
    pid: int
    key: ClientKey
    cacheMap: seq[CachedItem]
    config: LoaderClientConfig

  LoaderContext = ref object
    pagerClient: ClientData
    ssock: ServerSocket
    alive: bool
    config: LoaderConfig
    handleMap: Table[int, LoaderHandle]
    outputMap: Table[int, OutputHandle]
    selector: Selector[int]
    # List of file descriptors passed by the pager.
    passedFdMap: Table[string, FileHandle] # host -> fd
    # List of existing clients (buffer or pager) that may make requests.
    clientData: Table[int, ClientData] # pid -> data
    # ID of next output. TODO: find a better allocation scheme
    outputNum: int

  LoaderConfig* = object
    cgiDir*: seq[string]
    uriMethodMap*: URIMethodMap
    w3mCGICompat*: bool
    tmpdir*: string
    sockdir*: string

  LoaderClientConfig* = object
    cookieJar*: CookieJar
    defaultHeaders*: Headers
    filter*: URLFilter
    proxy*: URL
    referrerPolicy*: ReferrerPolicy
    insecureSSLNoVerify*: bool

  FetchPromise* = Promise[JSResult[Response]]

func isPrivileged(ctx: LoaderContext; client: ClientData): bool =
  return ctx.pagerClient == client

#TODO this may be too low if we want to use urimethodmap for everything
const MaxRewrites = 4

func canRewriteForCGICompat(ctx: LoaderContext; path: string): bool =
  if path.startsWith("/cgi-bin/") or path.startsWith("/$LIB/"):
    return true
  for dir in ctx.config.cgiDir:
    if path.startsWith(dir):
      return true
  return false

proc rejectHandle(handle: LoaderHandle; code: ConnectErrorCode; msg = "") =
  handle.sendResult(code, msg)
  handle.close()

func findOutput(ctx: LoaderContext; id: int; client: ClientData): OutputHandle =
  assert id != -1
  for it in ctx.outputMap.values:
    if it.outputId == id:
      # verify that it's safe to access this handle.
      doAssert ctx.isPrivileged(client) or client.pid == it.ownerPid
      return it
  return nil

func findCachedHandle(ctx: LoaderContext; cacheId: int): LoaderHandle =
  assert cacheId != -1
  for it in ctx.handleMap.values:
    if it.cacheId == cacheId:
      return it
  return nil

type PushBufferResult = enum
  pbrDone, pbrUnregister

proc register(ctx: LoaderContext; output: OutputHandle) =
  assert not output.registered
  ctx.selector.registerHandle(int(output.ostream.fd), {Write}, 0)
  output.registered = true

proc unregister(ctx: LoaderContext; output: OutputHandle) =
  assert output.registered
  ctx.selector.unregister(int(output.ostream.fd))
  output.registered = false

# Either write data to the target output, or append it to the list of buffers to
# write and register the output in our selector.
proc pushBuffer(ctx: LoaderContext; output: OutputHandle; buffer: LoaderBuffer;
    si: int): PushBufferResult =
  if output.suspended:
    if output.currentBuffer == nil:
      output.currentBuffer = buffer
      output.currentBufferIdx = si
    else:
      # si must be 0 here in all cases. Why? Well, it indicates the first unread
      # position after reading headers, and at that point currentBuffer will
      # be empty.
      #
      # Obviously, this breaks down if anything is pushed into the stream
      # before the header parser destroys itself. For now it never does, so we
      # should be fine.
      doAssert si == 0
      output.buffers.addLast(buffer)
  elif output.currentBuffer == nil:
    var n = si
    try:
      n += output.ostream.sendData(buffer, si)
    except ErrorAgain:
      discard
    except ErrorBrokenPipe:
      return pbrUnregister
    if n < buffer.len:
      output.currentBuffer = buffer
      output.currentBufferIdx = n
      ctx.register(output)
  else:
    output.buffers.addLast(buffer)
  pbrDone

proc getOutputId(ctx: LoaderContext): int =
  result = ctx.outputNum
  inc ctx.outputNum

proc redirectToFile(ctx: LoaderContext; output: OutputHandle;
    targetPath: string): bool =
  let ps = newPosixStream(targetPath, O_CREAT or O_WRONLY, 0o600)
  if ps == nil:
    return false
  try:
    if output.currentBuffer != nil:
      let n = ps.sendData(output.currentBuffer, output.currentBufferIdx)
      if unlikely(n < output.currentBuffer.len - output.currentBufferIdx):
        ps.sclose()
        return false
    for buffer in output.buffers:
      let n = ps.sendData(buffer)
      if unlikely(n < buffer.len):
        ps.sclose()
        return false
  except ErrorBrokenPipe:
    # ps is dead; give up.
    ps.sclose()
    return false
  if output.istreamAtEnd:
    ps.sclose()
  elif output.parent != nil:
    output.parent.outputs.add(OutputHandle(
      parent: output.parent,
      ostream: ps,
      istreamAtEnd: output.istreamAtEnd,
      outputId: ctx.getOutputId()
    ))
  return true

proc addCacheFile(ctx: LoaderContext; client: ClientData; output: OutputHandle):
    int =
  if output.parent != nil and output.parent.cacheId != -1:
    # may happen e.g. if client tries to cache a `cache:' URL
    return output.parent.cacheId
  let tmpf = getTempFile(ctx.config.tmpdir)
  if ctx.redirectToFile(output, tmpf):
    let cacheId = output.outputId
    if output.parent != nil:
      output.parent.cacheId = cacheId
    client.cacheMap.add(CachedItem(id: cacheId, path: tmpf, refc: 1))
    return cacheId
  return -1

proc addFd(ctx: LoaderContext; handle: LoaderHandle) =
  let output = handle.output
  output.ostream.setBlocking(false)
  handle.istream.setBlocking(false)
  ctx.selector.registerHandle(int(handle.istream.fd), {Read}, 0)
  assert handle.istream.fd notin ctx.handleMap
  assert output.ostream.fd notin ctx.outputMap
  ctx.handleMap[handle.istream.fd] = handle
  ctx.outputMap[output.ostream.fd] = output

type HandleReadResult = enum
  hrrDone, hrrUnregister, hrrBrokenPipe

# Called whenever there is more data available to read.
proc handleRead(ctx: LoaderContext; handle: LoaderHandle;
    unregWrite: var seq[OutputHandle]): HandleReadResult =
  var unregs = 0
  let maxUnregs = handle.outputs.len
  while true:
    let buffer = newLoaderBuffer()
    try:
      let n = handle.istream.recvData(buffer)
      if n == 0: # EOF
        return hrrUnregister
      var si = 0
      if handle.parser != nil:
        si = handle.parseHeaders(buffer)
        if si == -1: # died while parsing headers; unregister
          return hrrUnregister
        if si == n: # parsed the entire buffer as headers; skip output handling
          continue
      for output in handle.outputs:
        if output.dead:
          # do not push to unregWrite candidates
          continue
        case ctx.pushBuffer(output, buffer, si)
        of pbrUnregister:
          output.dead = true
          unregWrite.add(output)
          inc unregs
        of pbrDone: discard
      if unregs == maxUnregs:
        # early return: no more outputs to write to
        break
      if n < buffer.cap:
        break
    except ErrorAgain: # retry later
      break
    except ErrorBrokenPipe: # sender died; stop streaming
      return hrrBrokenPipe
  hrrDone

# stream is a regular file, so we can't select on it.
# cachedHandle is used for attaching the output handle to a different
# LoaderHandle when loadFromCache is called while a download is still ongoing
# (and thus some parts of the document are not cached yet).
proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) =
  assert handle.parser == nil # parser is only used with CGI
  var unregWrite: seq[OutputHandle] = @[]
  let r = ctx.handleRead(handle, unregWrite)
  for output in unregWrite:
    output.parent = nil
    let i = handle.outputs.find(output)
    if output.registered:
      ctx.unregister(output)
    handle.outputs.del(i)
  for output in handle.outputs:
    if r == hrrBrokenPipe:
      output.oclose()
    elif cachedHandle != nil:
      output.parent = cachedHandle
      cachedHandle.outputs.add(output)
      ctx.outputMap[output.ostream.fd] = output
    elif output.registered or output.suspended:
      output.parent = nil
      output.istreamAtEnd = true
      ctx.outputMap[output.ostream.fd] = output
    else:
      assert output.ostream.fd notin ctx.outputMap
      output.oclose()
  handle.outputs.setLen(0)
  handle.iclose()

proc loadStream(ctx: LoaderContext; handle: LoaderHandle; request: Request) =
  ctx.passedFdMap.withValue(request.url.pathname, fdp):
    handle.sendResult(0)
    handle.sendStatus(200)
    handle.sendHeaders(newHeaders())
    let ps = newPosixStream(fdp[])
    var stats: Stat
    doAssert fstat(fdp[], stats) != -1
    handle.istream = ps
    ctx.passedFdMap.del(request.url.pathname)
    if S_ISCHR(stats.st_mode) or S_ISREG(stats.st_mode):
      # regular file: e.g. cha <file
      # or character device: e.g. cha </dev/null
      handle.output.ostream.setBlocking(false)
      # not loading from cache, so cachedHandle is nil
      ctx.loadStreamRegular(handle, nil)
  do:
    handle.sendResult(ERROR_FILE_NOT_FOUND, "stream not found")

func find(cacheMap: seq[CachedItem]; id: int): int =
  for i, it in cacheMap:
    if it.id == id:
      return i
  -1

proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: LoaderHandle;
    request: Request) =
  let id = parseInt32(request.url.pathname).get(-1)
  let startFrom = if request.url.query.isSome:
    parseInt32(request.url.query.get).get(0)
  else:
    0
  let n = client.cacheMap.find(id)
  if n != -1:
    let ps = newPosixStream(client.cacheMap[n].path, O_RDONLY, 0)
    if startFrom != 0:
      ps.seek(startFrom)
    if ps == nil:
      handle.rejectHandle(ERROR_FILE_NOT_IN_CACHE)
      client.cacheMap.del(n)
      return
    handle.sendResult(0)
    handle.sendStatus(200)
    handle.sendHeaders(newHeaders())
    handle.istream = ps
    handle.output.ostream.setBlocking(false)
    let cachedHandle = ctx.findCachedHandle(id)
    ctx.loadStreamRegular(handle, cachedHandle)
  else:
    handle.sendResult(ERROR_URL_NOT_IN_CACHE)

# Data URL handler.
# Moved back into loader from CGI, because data URLs can get extremely long
# and thus no longer fit into the environment.
proc loadDataSend(ctx: LoaderContext; handle: LoaderHandle; s, ct: string) =
  handle.sendResult(0)
  handle.sendStatus(200)
  handle.sendHeaders(newHeaders({"Content-Type": ct}))
  let buffer = newLoaderBuffer(size = s.len)
  buffer.len = s.len
  copyMem(buffer.page, unsafeAddr s[0], s.len)
  let output = handle.output
  case ctx.pushBuffer(output, buffer, 0)
  of pbrUnregister:
    if output.registered:
      ctx.unregister(output)
    output.oclose()
  of pbrDone:
    if output.registered or output.suspended:
      output.istreamAtEnd = true
      ctx.outputMap[output.ostream.fd] = output
    else:
      output.oclose()

proc loadData(ctx: LoaderContext; handle: LoaderHandle; request: Request) =
  let url = request.url
  var ct = url.path.s.until(',')
  if AllChars - Ascii + Controls - {'\t', ' '} in ct:
    handle.sendResult(ERROR_INVALID_URL, "invalid data URL")
    handle.close()
    return
  let sd = ct.len + 1 # data start
  let body = percentDecode(url.path.s, sd)
  if ct.endsWith(";base64"):
    let d = atob0(body) # decode from ct end + 1
    if d.isNone:
      handle.sendResult(ERROR_INVALID_URL, "invalid data URL")
      handle.close()
      return
    ct.setLen(ct.len - ";base64".len) # remove base64 indicator
    ctx.loadDataSend(handle, d.get, ct)
  else:
    ctx.loadDataSend(handle, body, ct)

proc loadResource(ctx: LoaderContext; client: ClientData;
    config: LoaderClientConfig; request: Request; handle: LoaderHandle) =
  var redo = true
  var tries = 0
  var prevurl: URL = nil
  while redo and tries < MaxRewrites:
    redo = false
    if ctx.config.w3mCGICompat and request.url.scheme == "file":
      let path = request.url.path.serialize_unicode()
      if ctx.canRewriteForCGICompat(path):
        let newURL = newURL("cgi-bin:" & path & request.url.search)
        if newURL.isSome:
          request.url = newURL.get
          inc tries
          redo = true
          continue
    if request.url.scheme == "cgi-bin":
      var ostream: PosixStream = nil
      handle.loadCGI(request, ctx.config.cgiDir, prevurl,
        config.insecureSSLNoVerify, ostream)
      if handle.istream != nil:
        if ostream != nil:
          let outputIn = ctx.findOutput(request.body.outputId, client)
          if outputIn != nil:
            ostream.setBlocking(false)
            let output = outputIn.tee(ostream, ctx.getOutputId(), client.pid)
            ctx.outputMap[ostream.fd] = output
            output.suspended = false
            ctx.register(output)
          else:
            ostream.sclose()
        ctx.addFd(handle)
      else:
        assert ostream == nil
        handle.close()
    elif request.url.scheme == "stream":
      ctx.loadStream(handle, request)
      if handle.istream != nil:
        ctx.addFd(handle)
      else:
        handle.close()
    elif request.url.scheme == "cache":
      ctx.loadFromCache(client, handle, request)
      assert handle.istream == nil
      handle.close()
    elif request.url.scheme == "data":
      ctx.loadData(handle, request)
    else:
      prevurl = request.url
      case ctx.config.uriMethodMap.findAndRewrite(request.url)
      of ummrSuccess:
        inc tries
        redo = true
      of ummrWrongURL:
        handle.rejectHandle(ERROR_INVALID_URI_METHOD_ENTRY)
      of ummrNotFound:
        handle.rejectHandle(ERROR_UNKNOWN_SCHEME)
  if tries >= MaxRewrites:
    handle.rejectHandle(ERROR_TOO_MANY_REWRITES)

proc setupRequestDefaults(request: Request; config: LoaderClientConfig) =
  for k, v in config.defaultHeaders.table:
    if k notin request.headers.table:
      request.headers.table[k] = v
  if config.cookieJar != nil and config.cookieJar.cookies.len > 0:
    if "Cookie" notin request.headers.table:
      let cookie = config.cookieJar.serialize(request.url)
      if cookie != "":
        request.headers["Cookie"] = cookie
  if request.referrer != nil and "Referer" notin request.headers:
    let r = request.referrer.getReferrer(request.url, config.referrerPolicy)
    if r != "":
      request.headers["Referer"] = r

proc load(ctx: LoaderContext; stream: SocketStream; request: Request;
    client: ClientData; config: LoaderClientConfig) =
  let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid)
  when defined(debug):
    handle.url = request.url
    handle.output.url = request.url
  if not config.filter.match(request.url):
    handle.rejectHandle(ERROR_DISALLOWED_URL)
  else:
    request.setupRequestDefaults(config)
    if request.proxy == nil or not ctx.isPrivileged(client):
      request.proxy = config.proxy
    ctx.loadResource(client, config, request, handle)

proc load(ctx: LoaderContext; stream: SocketStream; client: ClientData;
    r: var BufferedReader) =
  var request: Request
  r.sread(request)
  ctx.load(stream, request, client, client.config)

proc loadConfig(ctx: LoaderContext; stream: SocketStream; client: ClientData;
    r: var BufferedReader) =
  var request: Request
  r.sread(request)
  var config: LoaderClientConfig
  r.sread(config)
  ctx.load(stream, request, client, config)

proc getCacheFile(ctx: LoaderContext; stream: SocketStream; client: ClientData;
    r: var BufferedReader) =
  var cacheId: int
  r.sread(cacheId)
  stream.withPacketWriter w:
    let n = client.cacheMap.find(cacheId)
    if n != -1:
      w.swrite(client.cacheMap[n].path)
    else:
      w.swrite("")

proc addClient(ctx: LoaderContext; stream: SocketStream;
    r: var BufferedReader) =
  var key: ClientKey
  var pid: int
  var config: LoaderClientConfig
  var clonedFrom: int
  r.sread(key)
  r.sread(pid)
  r.sread(config)
  r.sread(clonedFrom)
  stream.withPacketWriter w:
    if pid in ctx.clientData or key == default(ClientKey):
      w.swrite(false)
    else:
      let client = ClientData(pid: pid, key: key, config: config)
      ctx.clientData[pid] = client
      if clonedFrom != -1:
        let client2 = ctx.clientData[clonedFrom]
        for item in client2.cacheMap:
          inc item.refc
        client.cacheMap = client2.cacheMap
      w.swrite(true)
  stream.sclose()

proc cleanup(client: ClientData) =
  for it in client.cacheMap:
    dec it.refc
    if it.refc == 0:
      discard unlink(cstring(it.path))

proc removeClient(ctx: LoaderContext; stream: SocketStream;
    r: var BufferedReader) =
  var pid: int
  r.sread(pid)
  if pid in ctx.clientData:
    let client = ctx.clientData[pid]
    client.cleanup()
    ctx.clientData.del(pid)
  stream.sclose()

proc addCacheFile(ctx: LoaderContext; stream: SocketStream; client: ClientData;
    r: var BufferedReader) =
  var outputId: int
  var targetPid: int
  r.sread(outputId)
  #TODO get rid of targetPid
  r.sread(targetPid)
  doAssert ctx.isPrivileged(client) or client.pid == targetPid
  let output = ctx.findOutput(outputId, client)
  assert output != nil
  let targetClient = ctx.clientData[targetPid]
  let id = ctx.addCacheFile(targetClient, output)
  stream.withPacketWriter w:
    w.swrite(id)
  stream.sclose()

proc redirectToFile(ctx: LoaderContext; stream: SocketStream;
    r: var BufferedReader) =
  var outputId: int
  var targetPath: string
  r.sread(outputId)
  r.sread(targetPath)
  let output = ctx.findOutput(outputId, ctx.pagerClient)
  var success = false
  if output != nil:
    success = ctx.redirectToFile(output, targetPath)
  stream.withPacketWriter w:
    w.swrite(success)
  stream.sclose()

proc shareCachedItem(ctx: LoaderContext; stream: SocketStream;
    r: var BufferedReader) =
  # share a cached file with another buffer. this is for newBufferFrom
  # (i.e. view source)
  var sourcePid: int # pid of source client
  var targetPid: int # pid of target client
  var id: int
  r.sread(sourcePid)
  r.sread(targetPid)
  r.sread(id)
  let sourceClient = ctx.clientData[sourcePid]
  let targetClient = ctx.clientData[targetPid]
  let n = sourceClient.cacheMap.find(id)
  let item = sourceClient.cacheMap[n]
  inc item.refc
  targetClient.cacheMap.add(item)
  stream.sclose()

proc passFd(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) =
  var id: string
  r.sread(id)
  let fd = stream.recvFileHandle()
  ctx.passedFdMap[id] = fd
  stream.sclose()

proc removeCachedItem(ctx: LoaderContext; stream: SocketStream;
    client: ClientData; r: var BufferedReader) =
  var id: int
  r.sread(id)
  let n = client.cacheMap.find(id)
  if n != -1:
    let item = client.cacheMap[n]
    client.cacheMap.del(n)
    dec item.refc
    if item.refc == 0:
      discard unlink(cstring(item.path))
  stream.sclose()

proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData;
    r: var BufferedReader) =
  var sourceId: int
  var targetPid: int
  r.sread(sourceId)
  r.sread(targetPid)
  let outputIn = ctx.findOutput(sourceId, client)
  if outputIn != nil:
    let id = ctx.getOutputId()
    let output = outputIn.tee(stream, id, targetPid)
    ctx.outputMap[output.ostream.fd] = output
    stream.withPacketWriter w:
      w.swrite(id)
    stream.setBlocking(false)
  else:
    stream.withPacketWriter w:
      w.swrite(-1)
    stream.sclose()

proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData;
    r: var BufferedReader) =
  var ids: seq[int]
  r.sread(ids)
  for id in ids:
    let output = ctx.findOutput(id, client)
    if output != nil:
      output.suspended = true
      if output.registered:
        # do not waste cycles trying to push into output
        ctx.unregister(output)
  stream.sclose()

proc resume(ctx: LoaderContext; stream: SocketStream; client: ClientData;
    r: var BufferedReader) =
  var ids: seq[int]
  r.sread(ids)
  for id in ids:
    let output = ctx.findOutput(id, client)
    if output != nil:
      output.suspended = false
      ctx.register(output)
  stream.sclose()

proc equalsConstantTime(a, b: ClientKey): bool =
  static:
    doAssert a.len == b.len
  {.push boundChecks:off, overflowChecks:off.}
  var i {.volatile.} = 0
  var res {.volatile.} = 0u8
  while i < a.len:
    res = res or (a[i] xor b[i])
    inc i
  {.pop.}
  return res == 0

proc acceptConnection(ctx: LoaderContext) =
  let stream = ctx.ssock.acceptSocketStream()
  try:
    stream.withPacketReader r:
      var myPid: int
      var key: ClientKey
      r.sread(myPid)
      r.sread(key)
      if myPid notin ctx.clientData:
        # possibly already removed
        stream.sclose()
        return
      let client = ctx.clientData[myPid]
      if not client.key.equalsConstantTime(key):
        # ditto
        stream.sclose()
        return
      var cmd: LoaderCommand
      r.sread(cmd)
      template privileged_command =
        doAssert ctx.isPrivileged(client)
      case cmd
      of lcAddClient:
        privileged_command
        ctx.addClient(stream, r)
      of lcRemoveClient:
        privileged_command
        ctx.removeClient(stream, r)
      of lcShareCachedItem:
        privileged_command
        ctx.shareCachedItem(stream, r)
      of lcPassFd:
        privileged_command
        ctx.passFd(stream, r)
      of lcRedirectToFile:
        privileged_command
        ctx.redirectToFile(stream, r)
      of lcLoadConfig:
        privileged_command
        ctx.loadConfig(stream, client, r)
      of lcGetCacheFile:
        privileged_command
        ctx.getCacheFile(stream, client, r)
      of lcAddCacheFile:
        ctx.addCacheFile(stream, client, r)
      of lcRemoveCachedItem:
        ctx.removeCachedItem(stream, client, r)
      of lcLoad:
        ctx.load(stream, client, r)
      of lcTee:
        ctx.tee(stream, client, r)
      of lcSuspend:
        ctx.suspend(stream, client, r)
      of lcResume:
        ctx.resume(stream, client, r)
  except ErrorBrokenPipe:
    # receiving end died while reading the file; give up.
    assert stream.fd notin ctx.outputMap
    stream.sclose()

proc exitLoader(ctx: LoaderContext) =
  ctx.ssock.close()
  for client in ctx.clientData.values:
    client.cleanup()
  exitnow(1)

var gctx: LoaderContext
proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext =
  var ctx = LoaderContext(
    alive: true,
    config: config,
    selector: newSelector[int]()
  )
  gctx = ctx
  let myPid = getCurrentProcessId()
  # we don't capsicumize loader, so -1 is appropriate here
  ctx.ssock = initServerSocket(config.sockdir, -1, myPid, blocking = true)
  let sfd = int(ctx.ssock.sock.getFd())
  ctx.selector.registerHandle(sfd, {Read}, 0)
  # The server has been initialized, so the main process can resume execution.
  let ps = newPosixStream(fd)
  ps.write(char(0u8))
  ps.sclose()
  onSignal SIGTERM:
    discard sig
    gctx.exitLoader()
  for dir in ctx.config.cgiDir.mitems:
    if dir.len > 0 and dir[^1] != '/':
      dir &= '/'
  # get pager's key
  let stream = ctx.ssock.acceptSocketStream()
  stream.withPacketReader r:
    block readNullKey:
      var pid: int # ignore pid
      r.sread(pid)
      # pager's key is still null
      var key: ClientKey
      r.sread(key)
      doAssert key == default(ClientKey)
    var cmd: LoaderCommand
    r.sread(cmd)
    doAssert cmd == lcAddClient
    var key: ClientKey
    var pid: int
    var config: LoaderClientConfig
    r.sread(key)
    r.sread(pid)
    r.sread(config)
    stream.withPacketWriter w:
      w.swrite(true)
    ctx.pagerClient = ClientData(key: key, pid: pid, config: config)
    ctx.clientData[pid] = ctx.pagerClient
    stream.sclose()
  # unblock main socket
  ctx.ssock.sock.getFd().setBlocking(false)
  # for CGI
  putEnv("SERVER_SOFTWARE", "Chawan")
  putEnv("SERVER_PROTOCOL", "HTTP/1.0")
  putEnv("SERVER_NAME", "localhost")
  putEnv("SERVER_PORT", "80")
  putEnv("REMOTE_HOST", "localhost")
  putEnv("REMOTE_ADDR", "127.0.0.1")
  putEnv("GATEWAY_INTERFACE", "CGI/1.1")
  putEnv("CHA_INSECURE_SSL_NO_VERIFY", "0")
  return ctx

# This is only called when an OutputHandle could not read enough of one (or
# more) buffers, and we asked select to notify us when it will be available.
proc handleWrite(ctx: LoaderContext; output: OutputHandle;
    unregWrite: var seq[OutputHandle]) =
  while output.currentBuffer != nil:
    let buffer = output.currentBuffer
    try:
      let n = output.ostream.sendData(buffer, output.currentBufferIdx)
      output.currentBufferIdx += n
      if output.currentBufferIdx < buffer.len:
        break
      output.bufferCleared() # swap out buffer
    except ErrorAgain: # never mind
      break
    except ErrorBrokenPipe: # receiver died; stop streaming
      unregWrite.add(output)
      break
  if output.isEmpty:
    if output.istreamAtEnd:
      # after EOF, no need to send anything more here
      unregWrite.add(output)
    else:
      # all buffers sent, no need to select on this output again for now
      ctx.unregister(output)

proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle];
    unregWrite: var seq[OutputHandle]) =
  # Unregister handles queued for unregistration.
  # It is possible for both unregRead and unregWrite to contain duplicates. To
  # avoid double-close/double-unregister, we set the istream/ostream of
  # unregistered handles to nil.
  for handle in unregRead:
    if handle.istream != nil:
      ctx.selector.unregister(int(handle.istream.fd))
      ctx.handleMap.del(handle.istream.fd)
      if handle.parser != nil:
        handle.finishParse()
      handle.iclose()
      for output in handle.outputs:
        output.istreamAtEnd = true
        if output.isEmpty:
          unregWrite.add(output)
  for output in unregWrite:
    if output.ostream != nil:
      if output.registered:
        ctx.unregister(output)
      ctx.outputMap.del(output.ostream.fd)
      output.oclose()
      let handle = output.parent
      if handle != nil: # may be nil if from loadStream S_ISREG
        let i = handle.outputs.find(output)
        handle.outputs.del(i)
        if handle.outputs.len == 0 and handle.istream != nil:
          # premature end of all output streams; kill istream too
          ctx.selector.unregister(int(handle.istream.fd))
          ctx.handleMap.del(handle.istream.fd)
          if handle.parser != nil:
            handle.finishParse()
          handle.iclose()

proc runFileLoader*(fd: cint; config: LoaderConfig) =
  var ctx = initLoaderContext(fd, config)
  let fd = int(ctx.ssock.sock.getFd())
  while ctx.alive:
    let events = ctx.selector.select(-1)
    var unregRead: seq[LoaderHandle] = @[]
    var unregWrite: seq[OutputHandle] = @[]
    for event in events:
      if Read in event.events:
        if event.fd == fd: # incoming connection
          ctx.acceptConnection()
        else:
          let handle = ctx.handleMap[event.fd]
          case ctx.handleRead(handle, unregWrite)
          of hrrDone: discard
          of hrrUnregister, hrrBrokenPipe: unregRead.add(handle)
      if Write in event.events:
        ctx.handleWrite(ctx.outputMap[event.fd], unregWrite)
      if Error in event.events:
        assert event.fd != fd
        ctx.outputMap.withValue(event.fd, outputp): # ostream died
          unregWrite.add(outputp[])
        do: # istream died
          let handle = ctx.handleMap[event.fd]
          unregRead.add(handle)
    ctx.finishCycle(unregRead, unregWrite)
  ctx.exitLoader()

proc getRedirect*(response: Response; request: Request): Request =
  if "Location" in response.headers.table:
    if response.status in 301u16..303u16 or response.status in 307u16..308u16:
      let location = response.headers.table["Location"][0]
      let url = parseURL(location, option(request.url))
      if url.isSome:
        let status = response.status
        if status == 303 and request.httpMethod notin {hmGet, hmHead} or
            status == 301 or
            status == 302 and request.httpMethod == hmPost:
          return newRequest(url.get, hmGet)
        else:
          return newRequest(url.get, request.httpMethod, body = request.body)
  return nil

template withLoaderPacketWriter(stream: SocketStream; loader: FileLoader;
    w, body: untyped) =
  stream.withPacketWriter w:
    w.swrite(loader.clientPid)
    w.swrite(loader.key)
    body

proc connect(loader: FileLoader): SocketStream =
  return connectSocketStream(loader.sockDir, loader.sockDirFd, loader.process,
    blocking = true)

# Start a request. This should not block (not for a significant amount of time
# anyway).
proc startRequest(loader: FileLoader; request: Request): SocketStream =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcLoad)
    w.swrite(request)
  return stream

proc startRequest*(loader: FileLoader; request: Request;
    config: LoaderClientConfig): SocketStream =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcLoadConfig)
    w.swrite(request)
    w.swrite(config)
  return stream

proc fetch0(loader: FileLoader; input: Request; promise: FetchPromise;
    redirectNum: int) =
  let stream = loader.startRequest(input)
  let fd = int(stream.fd)
  loader.registerFun(fd)
  loader.connecting[fd] = ConnectData(
    promise: promise,
    request: input,
    stream: stream,
    redirectNum: redirectNum
  )

proc fetch*(loader: FileLoader; input: Request): FetchPromise =
  let promise = FetchPromise()
  loader.fetch0(input, promise, 0)
  return promise

proc reconnect*(loader: FileLoader; data: ConnectData) =
  data.stream.sclose()
  let stream = loader.startRequest(data.request)
  let fd = int(stream.fd)
  loader.registerFun(fd)
  loader.connecting[fd] = ConnectData(
    promise: data.promise,
    request: data.request,
    stream: stream
  )

proc suspend*(loader: FileLoader; fds: seq[int]) =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcSuspend)
    w.swrite(fds)
  stream.sclose()

proc resume*(loader: FileLoader; fds: openArray[int]) =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcResume)
    w.swrite(fds)
  stream.sclose()

proc resume*(loader: FileLoader; fds: int) =
  loader.resume([fds])

proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcTee)
    w.swrite(sourceId)
    w.swrite(targetPid)
  var outputId: int
  var r = stream.initPacketReader()
  r.sread(outputId)
  return (stream, outputId)

proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): int =
  let stream = loader.connect()
  if stream == nil:
    return -1
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcAddCacheFile)
    w.swrite(outputId)
    w.swrite(targetPid)
  var r = stream.initPacketReader()
  var outputId: int
  r.sread(outputId)
  return outputId

proc getCacheFile*(loader: FileLoader; cacheId: int): string =
  let stream = loader.connect()
  if stream == nil:
    return ""
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcGetCacheFile)
    w.swrite(cacheId)
  var r = stream.initPacketReader()
  var s: string
  r.sread(s)
  return s

proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string):
    bool =
  let stream = loader.connect()
  if stream == nil:
    return false
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcRedirectToFile)
    w.swrite(outputId)
    w.swrite(targetPath)
  var r = stream.initPacketReader()
  r.sread(result)

proc onConnected*(loader: FileLoader; fd: int) =
  let connectData = loader.connecting[fd]
  let stream = connectData.stream
  let promise = connectData.promise
  let request = connectData.request
  var r = stream.initPacketReader()
  case connectData.state
  of cdsBeforeResult:
    var res: int
    r.sread(res) # packet 1
    if res == 0:
      r.sread(connectData.outputId) # packet 1
      inc connectData.state
    else:
      var msg: string
      # msg is discarded.
      #TODO maybe print if called from trusted code (i.e. global == client)?
      r.sread(msg) # packet 1
      loader.unregisterFun(fd)
      loader.unregistered.add(fd)
      stream.sclose()
      # delete before resolving the promise
      loader.connecting.del(fd)
      let err = newTypeError("NetworkError when attempting to fetch resource")
      promise.resolve(JSResult[Response].err(err))
  of cdsBeforeStatus:
    r.sread(connectData.status) # packet 2
    inc connectData.state
  of cdsBeforeHeaders:
    let response = newResponse(connectData.res, request, stream,
      connectData.outputId, connectData.status)
    r.sread(response.headers) # packet 3
    # Only a stream of the response body may arrive after this point.
    response.body = stream
    assert loader.unregisterFun != nil
    response.unregisterFun = proc() =
      loader.ongoing.del(response.body.fd)
      loader.unregistered.add(response.body.fd)
      loader.unregisterFun(response.body.fd)
    response.resumeFun = proc(outputId: int) =
      loader.resume(outputId)
    loader.ongoing[fd] = response
    stream.setBlocking(false)
    let redirect = response.getRedirect(request)
    # delete before resolving the promise
    loader.connecting.del(fd)
    if redirect != nil:
      response.unregisterFun()
      stream.sclose()
      let redirectNum = connectData.redirectNum + 1
      if redirectNum < 5: #TODO use config.network.max_redirect?
        loader.fetch0(redirect, promise, redirectNum)
      else:
        let err = newTypeError("NetworkError when attempting to fetch resource")
        promise.resolve(JSResult[Response].err(err))
    else:
      promise.resolve(JSResult[Response].ok(response))

proc onRead*(loader: FileLoader; fd: int) =
  let response = loader.ongoing.getOrDefault(fd)
  if response != nil:
    response.onRead(response)
    if response.body.isend:
      response.bodyRead.resolve()
      response.bodyRead = nil
      response.unregisterFun()

proc onError*(loader: FileLoader; fd: int) =
  let response = loader.ongoing.getOrDefault(fd)
  if response != nil:
    when defined(debug):
      var lbuf {.noinit.}: array[BufferSize, char]
      if not response.body.isend:
        let n = response.body.recvData(lbuf)
        assert n == 0
      assert response.body.isend
    response.bodyRead.resolve()
    response.bodyRead = nil
    response.unregisterFun()

# Note: this blocks until headers are received.
proc doRequest*(loader: FileLoader; request: Request): Response =
  let stream = loader.startRequest(request)
  let response = Response(url: request.url)
  var r = stream.initPacketReader()
  r.sread(response.res) # packet 1
  if response.res == 0:
    r.sread(response.outputId) # packet 1
    r = stream.initPacketReader()
    r.sread(response.status) # packet 2
    r = stream.initPacketReader()
    r.sread(response.headers) # packet 3
    # Only a stream of the response body may arrive after this point.
    response.body = stream
  else:
    var msg: string
    r.sread(msg) # packet 1
    stream.sclose()
  return response

proc shareCachedItem*(loader: FileLoader; id, targetPid: int; sourcePid = -1) =
  let stream = loader.connect()
  if stream != nil:
    let sourcePid = if sourcePid != -1: sourcePid else: loader.clientPid
    stream.withLoaderPacketWriter loader, w:
      w.swrite(lcShareCachedItem)
      w.swrite(sourcePid)
      w.swrite(targetPid)
      w.swrite(id)
    stream.sclose()

proc passFd*(loader: FileLoader; id: string; fd: FileHandle) =
  let stream = loader.connect()
  if stream != nil:
    stream.withLoaderPacketWriter loader, w:
      w.swrite(lcPassFd)
      w.swrite(id)
    stream.sendFileHandle(fd)
    stream.sclose()

proc removeCachedItem*(loader: FileLoader; cacheId: int) =
  let stream = loader.connect()
  if stream != nil:
    stream.withLoaderPacketWriter loader, w:
      w.swrite(lcRemoveCachedItem)
      w.swrite(cacheId)
    stream.sclose()

proc addClient*(loader: FileLoader; key: ClientKey; pid: int;
    config: LoaderClientConfig; clonedFrom: int): bool =
  let stream = loader.connect()
  stream.withLoaderPacketWriter loader, w:
    w.swrite(lcAddClient)
    w.swrite(key)
    w.swrite(pid)
    w.swrite(config)
    w.swrite(clonedFrom)
  var r = stream.initPacketReader()
  r.sread(result)
  stream.sclose()

proc removeClient*(loader: FileLoader; pid: int) =
  let stream = loader.connect()
  if stream != nil:
    stream.withLoaderPacketWriter loader, w:
      w.swrite(lcRemoveClient)
      w.swrite(pid)
    stream.sclose()

when defined(freebsd):
  let O_DIRECTORY* {.importc, header: "<fcntl.h>", noinit.}: cint

proc setSocketDir*(loader: FileLoader; path: string) =
  loader.sockDir = path
  when defined(freebsd):
    loader.sockDirFd = open(cstring(path), O_DIRECTORY)
  else:
    loader.sockDirFd = -1