about summary refs log blame commit diff stats
path: root/src/io/loader.nim
blob: 46f694e6f75dca339e650e8c4ee8fbe7311d448c (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13












                                                                               
                    

              
             
          

                    
 
                    
               
              
                 
              
                     
                 
                 
                  
                   


                       
                    
                   
                 
                    
                
                   
 


               
    
                          
                 
                                        
                                     
                           



                                 
                                               

                    
 



                      
                             
 





                                                                     

                      
 







                               
                        
                            


                                   



                                                                        
 

                                                    


                                               






                                                                          


                                  
                   
                     


                                                          

                               
                   
       
                                                
                   
 



                                                 
                                               
                  
                  












                                                                                 

                                                          























                                                                               




                                         





                                                              
                                                           






                                                                       
                                                   
                                                               









                                                                    
                                                                              






                                                              
                           




















                                                               
             









                                                                         
 





                                                                               
                                                                              











                                                                           
                                                               



                                                                           
                                                               





                                                                          
                              




                                      
                
 

                       







                                                
                                                        
                                      

                                        

                                     
                                 
                              
                           


                                          


                                   


                                     
                                 
     
                                                          
                                                           
       

                               

                                                                            

                           













                                                                            


                                                 
                                  







                                                     
                                                                                  
             
                          
                                                                          
                     


                          
                     


                                        
                                
                                 
                                                                     
                        

                                                 
 



                                                  




                                                  
# A file loader server (?)
# The idea here is that we receive requests with a socket, then respond to each
# with a response (ideally a document.)
# For now, the protocol looks like:
# C: Request
# S: res (0 => success, _ => error)
# if success:
#  S: status code
#  S: headers
#  S: response body
#
# The body is passed to the stream as-is, so effectively nothing can follow it.

import nativesockets
import options
import streams
import tables
import net
when defined(posix):
  import posix

import bindings/curl
import io/about
import io/file
import io/headers
import io/http
import io/posixstream
import io/promise
import io/request
import io/response
import io/urlfilter
import ips/serialize
import ips/serversocket
import ips/socketstream
import js/javascript
import types/cookie
import types/mime
import types/referer
import types/url
import utils/twtstr

export request
export response

type
  FileLoader* = ref object
    process*: Pid
    connecting*: Table[int, ConnectData]
    ongoing*: Table[int, OngoingData]
    unregistered*: seq[int]
    registerFun*: proc(fd: int)
    unregisterFun*: proc(fd: int)

  ConnectData = object
    promise: Promise[Result[Response, JSError]]
    stream: Stream
    request: Request

  OngoingData = object
    buf: string
    readbufsize: int
    response: Response
    bodyRead: Promise[string]

  ConnectErrorCode* = enum
    ERROR_SOURCE_NOT_FOUND = (-4, "clone source could not be found"),
    ERROR_LOADER_KILLED = (-3, "loader killed during transfer"),
    ERROR_DISALLOWED_URL = (-2, "url not allowed by filter"),
    ERROR_UNKNOWN_SCHEME = (-1, "unknown scheme")

  LoaderCommand = enum
    LOAD, QUIT

  LoaderContext = ref object
    ssock: ServerSocket
    alive: bool
    curlm: CURLM
    config: LoaderConfig
    extra_fds: seq[curl_waitfd]
    handleList: seq[HandleData]

  LoaderConfig* = object
    defaultheaders*: Headers
    filter*: URLFilter
    cookiejar*: CookieJar
    referrerpolicy*: ReferrerPolicy
    proxy*: URL
    # When set to false, requests with a proxy URL are overridden by the
    # loader proxy.
    acceptProxy*: bool

  FetchPromise* = Promise[Result[Response, JSError]]

converter toInt*(code: ConnectErrorCode): int =
  return int(code)

proc addFd(ctx: LoaderContext, fd: int, flags: int) =
  ctx.extra_fds.add(curl_waitfd(
    fd: cast[cint](fd),
    events: cast[cshort](flags)
  ))

proc loadResource(ctx: LoaderContext, request: Request, ostream: Stream) =
  case request.url.scheme
  of "file":
    loadFile(request.url, ostream)
    ostream.close()
  of "http", "https":
    let handleData = loadHttp(ctx.curlm, request, ostream)
    if handleData != nil:
      ctx.handleList.add(handleData)
  of "about":
    loadAbout(request, ostream)
    ostream.close()
  else:
    ostream.swrite(ERROR_UNKNOWN_SCHEME) # error
    ostream.close()

proc onLoad(ctx: LoaderContext, stream: Stream) =
  var request: Request
  stream.sread(request)
  if not ctx.config.filter.match(request.url):
    stream.swrite(ERROR_DISALLOWED_URL) # error
    stream.flush()
    stream.close()
  else:
    for k, v in ctx.config.defaultHeaders.table:
      if k notin request.headers.table:
        request.headers.table[k] = v
    if ctx.config.cookiejar != nil and ctx.config.cookiejar.cookies.len > 0:
      if "Cookie" notin request.headers.table:
        let cookie = ctx.config.cookiejar.serialize(request.url)
        if cookie != "":
          request.headers["Cookie"] = cookie
    if request.referer != nil and "Referer" notin request.headers.table:
      let r = getReferer(request.referer, request.url, ctx.config.referrerpolicy)
      if r != "":
        request.headers["Referer"] = r
    if request.proxy == nil or not ctx.config.acceptProxy:
      request.proxy = ctx.config.proxy
    ctx.loadResource(request, stream)

proc acceptConnection(ctx: LoaderContext) =
  #TODO TODO TODO acceptSocketStream should be non-blocking here,
  # otherwise the client disconnecting between poll and accept could
  # block this indefinitely.
  let stream = ctx.ssock.acceptSocketStream()
  try:
    var cmd: LoaderCommand
    stream.sread(cmd)
    case cmd
    of LOAD:
      ctx.onLoad(stream)
    of QUIT:
      ctx.alive = false
      stream.close()
  except IOError:
    # End-of-file, broken pipe, or something else. For now we just
    # ignore it and pray nothing breaks.
    # (TODO: this is probably not a very good idea.)
    stream.close()

proc finishCurlTransfer(ctx: LoaderContext, handleData: HandleData, res: int) =
  if res != int(CURLE_OK):
    try:
      handleData.ostream.swrite(int(res))
      handleData.ostream.flush()
    except IOError: # Broken pipe
      discard
  discard curl_multi_remove_handle(ctx.curlm, handleData.curl)
  handleData.ostream.close()
  handleData.cleanup()

proc exitLoader(ctx: LoaderContext) =
  for handleData in ctx.handleList:
    ctx.finishCurlTransfer(handleData, ERROR_LOADER_KILLED)
  discard curl_multi_cleanup(ctx.curlm)
  curl_global_cleanup()
  ctx.ssock.close()
  quit(0)

var gctx: LoaderContext
proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext =
  if curl_global_init(CURL_GLOBAL_ALL) != CURLE_OK:
    raise newException(Defect, "Failed to initialize libcurl.")
  let curlm = curl_multi_init()
  if curlm == nil:
    raise newException(Defect, "Failed to initialize multi handle.")
  var ctx = LoaderContext(
    alive: true,
    curlm: curlm,
    config: config
  )
  gctx = ctx
  ctx.ssock = initServerSocket()
  # The server has been initialized, so the main process can resume execution.
  var writef: File
  if not open(writef, FileHandle(fd), fmWrite):
    raise newException(Defect, "Failed to open input handle.")
  writef.write(char(0u8))
  writef.flushFile()
  close(writef)
  discard close(fd)
  onSignal SIGTERM, SIGINT:
    gctx.exitLoader()
  ctx.addFd(int(ctx.ssock.sock.getFd()), CURL_WAIT_POLLIN)
  return ctx

proc runFileLoader*(fd: cint, config: LoaderConfig) =
  var ctx = initLoaderContext(fd, config)
  while ctx.alive:
    var numfds: cint = 0
    #TODO do not discard
    discard curl_multi_poll(ctx.curlm, addr ctx.extra_fds[0],
      cuint(ctx.extra_fds.len), 30_000, addr numfds)
    discard curl_multi_perform(ctx.curlm, addr numfds)
    for extra_fd in ctx.extra_fds.mitems:
      # For now, this is always ssock.sock.getFd().
      if extra_fd.events == extra_fd.revents:
        ctx.acceptConnection()
        extra_fd.revents = 0
    var msgs_left: cint = 1
    while msgs_left > 0:
      let msg = curl_multi_info_read(ctx.curlm, addr msgs_left)
      if msg == nil:
        break
      if msg.msg == CURLMSG_DONE: # the only possible value atm
        var idx = -1
        for i in 0 ..< ctx.handleList.len:
          if ctx.handleList[i].curl == msg.easy_handle:
            idx = i
            break
        assert idx != -1
        ctx.finishCurlTransfer(ctx.handleList[idx], int(msg.data.result))
        ctx.handleList.del(idx)
  ctx.exitLoader()

proc applyHeaders(request: Request, response: Response) =
  if "Content-Type" in response.headers.table:
    response.contenttype = response.headers.table["Content-Type"][0].until(';')
  else:
    response.contenttype = guessContentType($response.url.path)
  if "Location" in response.headers.table:
    if response.status in 301u16..303u16 or response.status in 307u16..308u16:
      let location = response.headers.table["Location"][0]
      let url = parseUrl(location, option(request.url))
      if url.isSome:
        if (response.status == 303 and
            request.httpmethod notin {HTTP_GET, HTTP_HEAD}) or
            (response.status == 301 or response.status == 302 and
            request.httpmethod == HTTP_POST):
          response.redirect = newRequest(url.get, HTTP_GET,
            mode = request.mode, credentialsMode = request.credentialsMode,
            destination = request.destination)
        else:
          response.redirect = newRequest(url.get, request.httpmethod,
            body = request.body, multipart = request.multipart,
            mode = request.mode, credentialsMode = request.credentialsMode,
            destination = request.destination)

#TODO: add init
proc fetch*(loader: FileLoader, input: Request): FetchPromise =
  let stream = connectSocketStream(loader.process, false, blocking = true)
  stream.swrite(LOAD)
  stream.swrite(input)
  stream.flush()
  let fd = int(stream.source.getFd())
  loader.registerFun(fd)
  let promise = FetchPromise()
  loader.connecting[fd] = ConnectData(
    promise: promise,
    request: input,
    stream: stream
  )
  return promise

const BufferSize = 4096

proc onConnected*(loader: FileLoader, fd: int) =
  let connectData = loader.connecting[fd]
  let stream = connectData.stream
  let promise = connectData.promise
  let request = connectData.request
  var res: int
  stream.sread(res)
  if res == 0:
    let response = newResponse(res, request, fd, stream)
    assert loader.unregisterFun != nil
    let realCloseImpl = stream.closeImpl
    stream.closeImpl = nil
    response.unregisterFun = proc() =
      loader.ongoing.del(fd)
      loader.unregistered.add(fd)
      loader.unregisterFun(fd)
      realCloseImpl(stream)
    var status: int
    stream.sread(status)
    response.status = cast[uint16](status)
    stream.sread(response.headers)
    applyHeaders(request, response)
    response.body = stream
    loader.ongoing[fd] = OngoingData(
      response: response,
      readbufsize: BufferSize,
      bodyRead: response.bodyRead
    )
    SocketStream(stream).source.getFd().setBlocking(false)
    promise.resolve(Result[Response, JSError].ok(response))
  else:
    loader.unregisterFun(fd)
    loader.unregistered.add(fd)
    let err = newTypeError("NetworkError when attempting to fetch resource")
    promise.resolve(Result[Response, JSError].err(err))
  loader.connecting.del(fd)

proc onRead*(loader: FileLoader, fd: int) =
  loader.ongoing.withValue(fd, buffer):
    let response = buffer[].response
    while true:
      let olen = buffer[].buf.len
      buffer[].buf.setLen(olen + buffer.readbufsize)
      try:
        let n = response.body.readData(addr buffer[].buf[olen],
          buffer.readbufsize)
        if n != 0:
          if buffer[].readbufsize < BufferSize:
            buffer[].readbufsize = min(BufferSize, buffer[].readbufsize * 2)
        buffer[].buf.setLen(olen + n)
        if response.body.atEnd():
          buffer[].bodyRead.resolve(buffer[].buf)
          buffer[].bodyRead = nil
          buffer[].buf = ""
          response.unregisterFun()
        break
      except ErrorAgain, ErrorWouldBlock:
        assert buffer.readbufsize > 1
        buffer.readbufsize = buffer.readbufsize div 2

proc onError*(loader: FileLoader, fd: int) =
  loader.onRead(fd)

proc doRequest*(loader: FileLoader, request: Request, blocking = true): Response =
  new(result)
  result.url = request.url
  let stream = connectSocketStream(loader.process, false, blocking = true)
  stream.swrite(LOAD)
  stream.swrite(request)
  stream.flush()
  stream.sread(result.res)
  if result.res == 0:
    var status: int
    stream.sread(status)
    result.status = cast[uint16](status)
    stream.sread(result.headers)
    applyHeaders(request, result)
    # Only a stream of the response body may arrive after this point.
    result.body = stream
    if not blocking:
      stream.source.getFd().setBlocking(blocking)

proc quit*(loader: FileLoader) =
  let stream = connectSocketStream(loader.process)
  if stream != nil:
    stream.swrite(QUIT)

func getLoaderErrorMessage*(code: int): string =
  if code < 0:
    return $ConnectErrorCode(code)
  return $curl_easy_strerror(CURLcode(cint(code)))