about summary refs log blame commit diff stats
path: root/src/loader/loaderhandle.nim
blob: 11c33606e581b416c743f4b9cb58ce4f6637856f (plain) (tree)
1
2
3
4
5
6
7
8
                 
              
                 
 
                   
                   
                     
 


                    
                                             
 
    
                          
                                    
             
 
                                     
 



                                
                                 

                         

                  
                     
                    
               













                                                     

                                                                    
 
                            



                                                                     
                                                

                        
 



                                              
 
               




                                                       
 
                                                             
                                                                               
                                        


                                  

                       
                   


               
                                                                     



                               
 
                                                 
                             
 


                                                      
                                                                  
                      
                                                       
          
   
 




                                                    
       

                              




                                                                            




                                                
                       

                                 






                                        
 

                                                      
 





                                                            
                                    



                               
                       

                   






                                                        
                                           
                    
                                             
 




                                                           
                                           
                     


                                                                       



                                                      
                                                                               

                                                           
 






                                                                            



                                            







                                                           



                                    
                                   
                 
                               
                                
                             
                     
import std/deques
import std/net
import std/tables

import io/bufwriter
import io/dynstream
import loader/headers

when defined(debug):
  import types/url

const LoaderBufferPageSize = 4064 # 4096 - 32

type
  LoaderBufferObj = object
    page*: ptr UncheckedArray[uint8]
    len*: int

  LoaderBuffer* = ref LoaderBufferObj

  OutputHandle* = ref object
    parent*: LoaderHandle
    currentBuffer*: LoaderBuffer
    currentBufferIdx*: int
    buffers*: Deque[LoaderBuffer]
    ostream*: PosixStream
    istreamAtEnd*: bool
    ownerPid*: int
    outputId*: int
    registered*: bool
    suspended*: bool
    dead*: bool
    when defined(debug):
      url*: URL

  HandleParserState* = enum
    hpsBeforeLines, hpsAfterFirstLine, hpsControlDone

  HeaderParser* = ref object
    state*: HandleParserState
    lineBuffer*: string
    crSeen*: bool
    headers*: Headers
    status*: uint16

  ResponseState = enum
    rsBeforeResult, rsAfterFailure, rsBeforeStatus, rsBeforeHeaders,
    rsAfterHeaders

  LoaderHandle* = ref object
    istream*: PosixStream # stream for taking input
    outputs*: seq[OutputHandle] # list of outputs to be streamed into
    cacheId*: int # if cached, our ID in a client cacheMap
    parser*: HeaderParser # only exists for CGI handles
    rstate: ResponseState # track response state
    when defined(debug):
      url*: URL

proc `=destroy`(buffer: var LoaderBufferObj) =
  if buffer.page != nil:
    dealloc(buffer.page)
    buffer.page = nil

# for debugging
when defined(debug):
  func `$`*(buffer: LoaderBuffer): string =
    var s = newString(buffer.len)
    copyMem(addr s[0], addr buffer.page[0], buffer.len)
    return s

# Create a new loader handle, with the output stream ostream.
proc newLoaderHandle*(ostream: PosixStream; outputId, pid: int): LoaderHandle =
  let handle = LoaderHandle(cacheId: -1)
  handle.outputs.add(OutputHandle(
    ostream: ostream,
    parent: handle,
    outputId: outputId,
    ownerPid: pid,
    suspended: true
  ))
  return handle

proc findOutputHandle*(handle: LoaderHandle; fd: int): OutputHandle =
  for output in handle.outputs:
    if output.ostream.fd == fd:
      return output
  return nil

func cap*(buffer: LoaderBuffer): int {.inline.} =
  return LoaderBufferPageSize

template isEmpty*(output: OutputHandle): bool =
  output.currentBuffer == nil and not output.suspended

proc newLoaderBuffer*(size = LoaderBufferPageSize): LoaderBuffer =
  return LoaderBuffer(
    page: cast[ptr UncheckedArray[uint8]](alloc(size)),
    len: 0
  )

proc bufferCleared*(output: OutputHandle) =
  assert output.currentBuffer != nil
  output.currentBufferIdx = 0
  if output.buffers.len > 0:
    output.currentBuffer = output.buffers.popFirst()
  else:
    output.currentBuffer = nil

proc tee*(outputIn: OutputHandle; ostream: PosixStream; outputId, pid: int):
    OutputHandle =
  assert outputIn.suspended
  let output = OutputHandle(
    parent: outputIn.parent,
    ostream: ostream,
    currentBuffer: outputIn.currentBuffer,
    currentBufferIdx: outputIn.currentBufferIdx,
    buffers: outputIn.buffers,
    istreamAtEnd: outputIn.istreamAtEnd,
    outputId: outputId,
    ownerPid: pid,
    suspended: outputIn.suspended
  )
  when defined(debug):
    output.url = outputIn.url
  if outputIn.parent != nil:
    assert outputIn.parent.parser == nil
    outputIn.parent.outputs.add(output)
  return output

template output*(handle: LoaderHandle): OutputHandle =
  handle.outputs[0]

proc sendResult*(handle: LoaderHandle; res: int; msg = "") =
  assert handle.rstate == rsBeforeResult
  inc handle.rstate
  let output = handle.output
  let blocking = output.ostream.blocking
  output.ostream.setBlocking(true)
  output.ostream.withPacketWriter w:
    w.swrite(res)
    if res == 0: # success
      assert msg == ""
      w.swrite(output.outputId)
      inc handle.rstate
    else: # error
      w.swrite(msg)
  output.ostream.setBlocking(blocking)

proc sendStatus*(handle: LoaderHandle; status: uint16) =
  assert handle.rstate == rsBeforeStatus
  inc handle.rstate
  let blocking = handle.output.ostream.blocking
  handle.output.ostream.setBlocking(true)
  handle.output.ostream.withPacketWriter w:
    w.swrite(status)
  handle.output.ostream.setBlocking(blocking)

proc sendHeaders*(handle: LoaderHandle; headers: Headers) =
  assert handle.rstate == rsBeforeHeaders
  inc handle.rstate
  let blocking = handle.output.ostream.blocking
  handle.output.ostream.setBlocking(true)
  handle.output.ostream.withPacketWriter w:
    w.swrite(headers)
  handle.output.ostream.setBlocking(blocking)

proc recvData*(ps: PosixStream; buffer: LoaderBuffer): int {.inline.} =
  let n = ps.recvData(addr buffer.page[0], buffer.cap)
  buffer.len = n
  return n

proc sendData*(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} =
  assert buffer.len - si > 0
  return ps.sendData(addr buffer.page[si], buffer.len - si)

proc iclose*(handle: LoaderHandle) =
  if handle.istream != nil:
    if handle.rstate notin {rsBeforeResult, rsAfterFailure, rsAfterHeaders}:
      assert handle.outputs.len == 1
      # not an ideal solution, but better than silently eating malformed
      # headers
      try:
        if handle.rstate == rsBeforeStatus:
          handle.sendStatus(500)
        if handle.rstate == rsBeforeHeaders:
          handle.sendHeaders(newHeaders())
        handle.output.ostream.setBlocking(true)
        const msg = "Error: malformed header in CGI script"
        discard handle.output.ostream.sendData(msg)
      except ErrorBrokenPipe:
        discard # receiver is dead
    handle.istream.sclose()
    handle.istream = nil

proc oclose*(output: OutputHandle) =
  output.ostream.sclose()
  output.ostream = nil

proc close*(handle: LoaderHandle) =
  handle.iclose()
  for output in handle.outputs:
    assert not output.registered
    if output.ostream != nil:
      output.oclose()