summary refs log blame commit diff stats
path: root/lib/pure/collections/LockFreeHash.nim
blob: 954d62491b5e304c02f1852a5307ead87a2b9054 (plain) (tree)
1
2
3
4
5
6
7
8
9
                                                                          
 
                   
 



                                                                               
                                         

                                   



                            

                         

                                             
 
                                                

                             
                                               

                                      



                            
                         
 
                                                                               
                     


                                                   





                       
 


                              
      
                              

                                                                           
                             

                              
                                               

                                                                           
                             
     
                                   

    
               


              
                                            
 





                                       
                         
                  
                                                    

                                                                
                                




                                                                               



                                                     

                                      
                                           





                     
                                      

                
                                                                               


                                                 
                         

                    
                                                                               
 
                                                                          









                                                                               
                                                                    


                                   
                                         
                                      
                  
 
                                                                    







                                                     
                          
 


                                      





                                      

                                        




                                       
                                                                               

                                               

                                     



                          
                          










                                                                               
                                                         




                                                                                                  

                             


                                                                                   
 

                                                                               

                                                           
                
                
                    

                   
       

                          
                               













                                                                                              
     
              
              



                                                    
                                                                     
                                                             

                                                                           

                                                                


                                                                        
                                                  
                                      



                                                           
                                        
                         
                                       
                
                    
                                                
                      
                                                                   
             
                                                                                  
        
                             

                                               
                                                                       
                                                                      

                              





                                                                               
                                                               








                                                                 
                                                      


                                                                               


                                                                            


                                                                 
                

                                    
                                                                   
                 
                                                                        
                                                                   

                                                   

                                     
                            

                   
 





                                                                                    


                                                        
                                              
 

                                                                               
 

                                                                 




                                                                                    







                                                                   
                                                                        
                                                                
                                   
                         
                                                




                                                                                     
                                                                              
                                              

 




                                                                               


                                                                             
       


                                   
              


                                                                 

                                                 


                                                                              



                                                                          

                                                 

                                                                  

                                                                   
                                

                                         
                                                                               





                                                                     

                                         
                                          

                                                       



                                
                                                                   
                   
                                           
                 


                                                            






                                                                                    
                                                

                                                     
                                         



                                                                               
                                                                    

                                                    

                                                               
                                                                        
                                                                 
                                                                         
                                     







                                                                               
                                                             
                                   
                

                          


                                   
              
            
             

                                 
                                                                                  
                                                                    




                                                                
                                             
                  
             
                                                                     


                                               
                                    
         
                                                                                    


                                                                     
                  
             





                                                                               
 
                                                                
                                                        
 
                                                                   
                                                             

                                                                    
                      
                                           
       
                          
                      
                                           
       
                          
                                                           
                                                                          

                                        
                                        

 

                                                           

                        
                                                            
         

                                                  
                        
                                                       


                                             








 












                                                                                      
                                           
                               

       



                      
 
      
                   
              


             



                                                  

     
                                             

                                         

                                   

                                                                 

                                   
                                                        
                         

                          



                                                                             

                                        

                             
                              
                                                
                         

                         

          




                                           
 




                                           

                  


 


               

                                      




                                             
 




                                        
#nim c -t:-march=i686 --cpu:amd64 --threads:on -d:release lockfreehash.nim

import math, hashes

#------------------------------------------------------------------------------
## Memory Utility Functions

proc newHeap*[T](): ptr T =
  result = cast[ptr T](alloc0(sizeof(T)))

proc copyNew*[T](x: var T): ptr T =
  var
    size = sizeof(T)
    mem = alloc(size)
  copyMem(mem, x.addr, size)
  return cast[ptr T](mem)

proc copyTo*[T](val: var T, dest: int) =
  copyMem(pointer(dest), val.addr, sizeof(T))

proc allocType*[T](): pointer = alloc(sizeof(T))

proc newShared*[T](): ptr T =
  result = cast[ptr T](allocShared0(sizeof(T)))

proc copyShared*[T](x: var T): ptr T =
  var
    size = sizeof(T)
    mem = allocShared(size)
  copyMem(mem, x.addr, size)
  return cast[ptr T](mem)

#------------------------------------------------------------------------------
## Pointer arithmetic

proc `+`*(p: pointer, i: int): pointer {.inline.} =
  cast[pointer](cast[int](p) + i)

const
  minTableSize = 8
  reProbeLimit = 12
  minCopyWork = 4096
  intSize = sizeof(int)



when sizeof(int) == 4: # 32bit
  type
    Raw = range[0..1073741823]
    ## The range of uint values that can be stored directly in a value slot
    ## when on a 32 bit platform
  {.deprecated: [TRaw: Raw].}
elif sizeof(int) == 8: # 64bit
  type
    Raw = range[0'i64..4611686018427387903'i64]
    ## The range of uint values that can be stored directly in a value slot
    ## when on a 64 bit platform
  {.deprecated: [TRaw: Raw].}
else:
  {.error: "unsupported platform".}

type
  Entry = tuple
    key: int
    value: int

  EntryArr = ptr array[0..10_000_000, Entry]

  PConcTable[K,V] = ptr object {.pure.}
    len: int
    used: int
    active: int
    copyIdx: int
    copyDone: int
    next: PConcTable[K,V]
    data: EntryArr
{.deprecated: [TEntry: Entry, TEntryArr: EntryArr].}

proc setVal[K,V](table: var PConcTable[K,V], key: int, val: int,
  expVal: int, match: bool): int

#------------------------------------------------------------------------------

# Create a new table
proc newLFTable*[K,V](size: int = minTableSize): PConcTable[K,V]  =
  let
    dataLen = max(nextPowerOfTwo(size), minTableSize)
    dataSize = dataLen*sizeof(Entry)
    dataMem = allocShared0(dataSize)
    tableSize = 7 * intSize
    tableMem = allocShared0(tableSize)
    table = cast[PConcTable[K,V]](tableMem)
  table.len = dataLen
  table.used = 0
  table.active = 0
  table.copyIdx = 0
  table.copyDone = 0
  table.next = nil
  table.data = cast[EntryArr](dataMem)
  result = table

#------------------------------------------------------------------------------

# Delete a table
proc deleteConcTable[K,V](tbl: PConcTable[K,V]) =
  deallocShared(tbl.data)
  deallocShared(tbl)

#------------------------------------------------------------------------------

proc `[]`[K,V](table: var PConcTable[K,V], i: int): var Entry {.inline.} =
  table.data[i]

#------------------------------------------------------------------------------
# State flags stored in ptr


proc pack[T](x: T): int {.inline.} =
  result = (cast[int](x) shl 2)
  #echo("packKey ",cast[int](x) , " -> ", result)

# Pop the flags off returning a 4 byte aligned ptr to our Key or Val
proc pop(x: int): int  {.inline.} =
  result = x and 0xFFFFFFFC'i32

# Pop the raw value off of our Key or Val
proc popRaw(x: int): int  {.inline.} =
  result = x shr 2

# Pop the flags off returning a 4 byte aligned ptr to our Key or Val
proc popPtr[V](x: int): ptr V  {.inline.} =
  result = cast[ptr V](pop(x))
  #echo("popPtr " & $x & " -> " & $cast[int](result))

# Ghost (sentinel)
# K or V is no longer valid use new table
const Ghost = 0xFFFFFFFC
proc isGhost(x: int): bool {.inline.} =
  result = x == 0xFFFFFFFC

# Tombstone
# applied to V = K is dead
proc isTomb(x: int): bool {.inline.} =
  result = (x and 0x00000002) != 0

proc setTomb(x: int): int {.inline.} =
  result = x or 0x00000002

# Prime
# K or V is in new table copied from old
proc isPrime(x: int): bool {.inline.} =
  result = (x and 0x00000001) != 0

proc setPrime(x: int): int {.inline.} =
  result = x or 0x00000001

#------------------------------------------------------------------------------

##This is for i32 only need to override for i64
proc hashInt(x: int):int {.inline.} =
  var h = uint32(x) #shr 2'u32
  h = h xor (h shr 16'u32)
  h *= 0x85ebca6b'u32
  h = h xor (h shr 13'u32)
  h *= 0xc2b2ae35'u32
  h = h xor (h shr 16'u32)
  result = int(h)

#------------------------------------------------------------------------------

proc resize[K,V](self: PConcTable[K,V]): PConcTable[K,V] =
  var next = atomic_load_n(self.next.addr, ATOMIC_RELAXED)
  #echo("next = " & $cast[int](next))
  if next != nil:
    #echo("A new table already exists, copy in progress")
    return next
  var
    oldLen = atomic_load_n(self.len.addr, ATOMIC_RELAXED)
    newTable = newLFTable[K,V](oldLen*2)
    success = atomic_compare_exchange_n(self.next.addr, next.addr, newTable,
                  false, ATOMIC_RELAXED, ATOMIC_RELAXED)
  if not success:
    echo("someone beat us to it! delete table we just created and return his " & $cast[int](next))
    deleteConcTable(newTable)
    return next
  else:
    echo("Created New Table! " & $cast[int](newTable) & " Size = " & $newTable.len)
    return newTable


#------------------------------------------------------------------------------
#proc keyEQ[K](key1: ptr K, key2: ptr K): bool {.inline.} =
proc keyEQ[K](key1: int, key2: int): bool {.inline.} =
  result = false
  when K is Raw:
    if key1 == key2:
      result = true
  else:
    var
      p1 = popPtr[K](key1)
      p2 = popPtr[K](key2)
    if p1 != nil and p2 != nil:
      if cast[int](p1) == cast[int](p2):
        return true
      if p1[] == p2[]:
        return true

#------------------------------------------------------------------------------

#proc tableFull(self: var PConcTable[K,V]) : bool {.inline.} =


#------------------------------------------------------------------------------

proc copySlot[K,V](idx: int, oldTbl: var PConcTable[K,V], newTbl: var PConcTable[K,V]): bool =
  #echo("Copy idx " & $idx)
  var
    oldVal = 0
    oldkey = 0
    ok = false
  result = false
  #Block the key so no other threads waste time here
  while not ok:
    ok = atomic_compare_exchange_n(oldTbl[idx].key.addr, oldKey.addr,
      setTomb(oldKey), false, ATOMIC_RELAXED, ATOMIC_RELAXED)
  #echo("oldKey was = " & $oldKey & "  set it to tomb " & $setTomb(oldKey))
  #Prevent new values from appearing in the old table by priming
  oldVal = atomic_load_n(oldTbl[idx].value.addr, ATOMIC_RELAXED)
  while not isPrime(oldVal):
    var box = if oldVal == 0 or isTomb(oldVal) : oldVal.setTomb.setPrime
      else: oldVal.setPrime
    if atomic_compare_exchange_n(oldTbl[idx].value.addr, oldVal.addr,
      box, false, ATOMIC_RELAXED, ATOMIC_RELAXED):
      if isPrime(box) and isTomb(box):
        return true
      oldVal = box
      break
  #echo("oldVal was = ", oldVal, "  set it to prime ", box)
  if isPrime(oldVal) and isTomb(oldVal):
    #when not (K is Raw):
    #  deallocShared(popPtr[K](oldKey))
    return false
  if isTomb(oldVal):
    echo("oldVal is Tomb!!!, should not happen")
  if pop(oldVal) != 0:
    result = setVal(newTbl, pop(oldKey), pop(oldVal), 0, true) == 0
  #if result:
    #echo("Copied a Slot! idx= " & $idx & " key= " & $oldKey & " val= " & $oldVal)
  #else:
    #echo("copy slot failed")
  # Our copy is done so we disable the old slot
  while not ok:
    ok = atomic_compare_exchange_n(oldTbl[idx].value.addr, oldVal.addr,
      oldVal.setTomb.setPrime , false, ATOMIC_RELAXED, ATOMIC_RELAXED)
  #echo("disabled old slot")
  #echo"---------------------"

#------------------------------------------------------------------------------

proc promote[K,V](table: var PConcTable[K,V]) =
  var
    newData = atomic_load_n(table.next.data.addr, ATOMIC_RELAXED)
    newLen = atomic_load_n(table.next.len.addr, ATOMIC_RELAXED)
    newUsed = atomic_load_n(table.next.used.addr, ATOMIC_RELAXED)

  deallocShared(table.data)
  atomic_store_n(table.data.addr, newData, ATOMIC_RELAXED)
  atomic_store_n(table.len.addr, newLen, ATOMIC_RELAXED)
  atomic_store_n(table.used.addr, newUsed, ATOMIC_RELAXED)
  atomic_store_n(table.copyIdx.addr, 0, ATOMIC_RELAXED)
  atomic_store_n(table.copyDone.addr, 0, ATOMIC_RELAXED)
  deallocShared(table.next)
  atomic_store_n(table.next.addr, nil, ATOMIC_RELAXED)
  echo("new table swapped!")

#------------------------------------------------------------------------------

proc checkAndPromote[K,V](table: var PConcTable[K,V], workDone: int): bool =
  var
    oldLen = atomic_load_n(table.len.addr, ATOMIC_RELAXED)
    copyDone = atomic_load_n(table.copyDone.addr, ATOMIC_RELAXED)
    ok: bool
  result = false
  if workDone > 0:
    #echo("len to copy =" & $oldLen)
    #echo("copyDone + workDone = " & $copyDone & " + " & $workDone)
    while not ok:
      ok = atomic_compare_exchange_n(table.copyDone.addr, copyDone.addr,
        copyDone + workDone, false, ATOMIC_RELAXED, ATOMIC_RELAXED)
    #if ok: echo("set copyDone")
    # If the copy is done we can promote this table
    if copyDone + workDone >= oldLen:
      # Swap new data
      #echo("work is done!")
      table.promote
      result = true

#------------------------------------------------------------------------------

proc copySlotAndCheck[K,V](table: var PConcTable[K,V], idx: int):
  PConcTable[K,V] =
  var
    newTable = cast[PConcTable[K,V]](atomic_load_n(table.next.addr, ATOMIC_RELAXED))
  result = newTable
  if newTable != nil and copySlot(idx, table, newTable):
    #echo("copied a single slot, idx = " & $idx)
    if checkAndPromote(table, 1): return table


#------------------------------------------------------------------------------

proc helpCopy[K,V](table: var PConcTable[K,V]): PConcTable[K,V] =
  var
    newTable = cast[PConcTable[K,V]](atomic_load_n(table.next.addr, ATOMIC_RELAXED))
  result = newTable
  if newTable != nil:
    var
      oldLen = atomic_load_n(table.len.addr, ATOMIC_RELAXED)
      copyDone = atomic_load_n(table.copyDone.addr, ATOMIC_RELAXED)
      copyIdx = 0
      work = min(oldLen, minCopyWork)
      #panicStart = -1
      workDone = 0
    if copyDone < oldLen:
      var ok: bool
      while not ok:
        ok = atomic_compare_exchange_n(table.copyIdx.addr, copyIdx.addr,
          copyIdx + work, false, ATOMIC_RELAXED, ATOMIC_RELAXED)
      #echo("copy idx = ", copyIdx)
      for i in 0..work-1:
        var idx = (copyIdx + i) and (oldLen - 1)
        if copySlot(idx, table, newTable):
          workDone += 1
      if workDone > 0:
        #echo("did work ", workDone, " on thread ", cast[int](myThreadID[pointer]()))
        if checkAndPromote(table, workDone): return table
    # In case a thread finished all the work then got stalled before promotion
    if checkAndPromote(table, 0): return table



#------------------------------------------------------------------------------

proc setVal[K,V](table: var PConcTable[K,V], key: int, val: int,
  expVal: int, match: bool): int =
  #echo("-try set- in table ", " key = ", (popPtr[K](key)[]), " val = ", val)
  when K is Raw:
    var idx = hashInt(key)
  else:
    var idx = popPtr[K](key)[].hash
  var
    nextTable: PConcTable[K,V]
    probes = 1
  # spin until we find a key slot or build and jump to next table
  while true:
    idx = idx and (table.len - 1)
    #echo("try set idx = " & $idx & "for" & $key)
    var
      probedKey = 0
      openKey = atomic_compare_exchange_n(table[idx].key.addr, probedKey.addr,
        key, false, ATOMIC_RELAXED, ATOMIC_RELAXED)
    if openKey:
      if val.isTomb:
        #echo("val was tomb, bail, no reason to set an open slot to tomb")
        return val
      #increment used slots
      #echo("found an open slot, total used = " &
      #$atomic_add_fetch(table.used.addr, 1, ATOMIC_RELAXED))
      discard atomic_add_fetch(table.used.addr, 1, ATOMIC_RELAXED)
      break # We found an open slot
    #echo("set idx ", idx, " key = ", key, " probed = ", probedKey)
    if keyEQ[K](probedKey, key):
      #echo("we found the matching slot")
      break # We found a matching slot
    if (not(expVal != 0 and match)) and (probes >= reProbeLimit or key.isTomb):
      if key.isTomb: echo("Key is Tombstone")
      #if probes >= reProbeLimit: echo("Too much probing " & $probes)
      #echo("try to resize")
      #create next bigger table
      nextTable = resize(table)
      #help do some copying
      #echo("help copy old table to new")
      nextTable = helpCopy(table)
      #now setVal in the new table instead
      #echo("jumping to next table to set val")
      return setVal(nextTable, key, val, expVal, match)
    else:
      idx += 1
      probes += 1
  # Done spinning for a new slot
  var oldVal = atomic_load_n(table[idx].value.addr, ATOMIC_RELAXED)
  if val == oldVal:
    #echo("this val is alredy in the slot")
    return oldVal
  nextTable = atomic_load_n(table.next.addr, ATOMIC_SEQ_CST)
  if nextTable == nil and
    ((oldVal == 0 and
    (probes >= reProbeLimit or table.used / table.len > 0.8)) or
    (isPrime(oldVal))):
    if table.used / table.len > 0.8: echo("resize because usage ratio = " &
      $(table.used / table.len))
    if isPrime(oldVal): echo("old val isPrime, should be a rare mem ordering event")
    nextTable = resize(table)
  if nextTable != nil:
    #echo("tomb old slot then set in new table")
    nextTable = copySlotAndCheck(table,idx)
    return setVal(nextTable, key, val, expVal, match)
  # Finally ready to add new val to table
  while true:
    if match and oldVal != expVal:
      #echo("set failed, no match  oldVal= " & $oldVal & " expVal= " & $expVal)
      return oldVal
    if atomic_compare_exchange_n(table[idx].value.addr, oldVal.addr,
        val, false, ATOMIC_RELEASE, ATOMIC_RELAXED):
      #echo("val set at table " & $cast[int](table))
      if expVal != 0:
        if (oldVal == 0 or isTomb(oldVal)) and not isTomb(val):
          discard atomic_add_fetch(table.active.addr, 1, ATOMIC_RELAXED)
        elif not (oldVal == 0 or isTomb(oldVal)) and isTomb(val):
          discard atomic_add_fetch(table.active.addr, -1, ATOMIC_RELAXED)
      if oldVal == 0 and expVal != 0:
        return setTomb(oldVal)
      else: return oldVal
    if isPrime(oldVal):
      nextTable = copySlotAndCheck(table, idx)
      return setVal(nextTable, key, val, expVal, match)

#------------------------------------------------------------------------------

proc getVal[K,V](table: var PConcTable[K,V], key: int): int =
  #echo("-try get-  key = " & $key)
  when K is Raw:
    var idx = hashInt(key)
  else:
    var idx = popPtr[K](key)[].hash
    #echo("get idx ", idx)
  var
    probes = 0
    val: int
  while true:
    idx = idx and (table.len - 1)
    var
      newTable: PConcTable[K,V] # = atomic_load_n(table.next.addr, ATOMIC_ACQUIRE)
      probedKey = atomic_load_n(table[idx].key.addr, ATOMIC_SEQ_CST)
    if keyEQ[K](probedKey, key):
      #echo("found key after ", probes+1)
      val = atomic_load_n(table[idx].value.addr, ATOMIC_ACQUIRE)
      if not isPrime(val):
        if isTomb(val):
          #echo("val was tomb but not prime")
          return 0
        else:
          #echo("-GotIt- idx = ", idx, " key = ", key, " val ", val )
          return val
      else:
        newTable = copySlotAndCheck(table, idx)
        return getVal(newTable, key)
    else:
      #echo("probe ", probes, " idx = ", idx, " key = ", key, " found ", probedKey )
      if probes >= reProbeLimit*4 or key.isTomb:
        if newTable == nil:
          #echo("too many probes and no new table ", key, "  ", idx )
          return 0
        else:
          newTable = helpCopy(table)
          return getVal(newTable, key)
      idx += 1
      probes += 1

#------------------------------------------------------------------------------

#proc set*(table: var PConcTable[Raw,Raw], key: Raw, val: Raw) =
#  discard setVal(table, pack(key), pack(key), 0, false)

#proc set*[V](table: var PConcTable[Raw,V], key: Raw, val: ptr V) =
#  discard setVal(table, pack(key), cast[int](val), 0, false)

proc set*[K,V](table: var PConcTable[K,V], key: var K, val: var V) =
  when not (K is Raw):
    var newKey = cast[int](copyShared(key))
  else:
    var newKey = pack(key)
  when not (V is Raw):
    var newVal = cast[int](copyShared(val))
  else:
    var newVal = pack(val)
  var oldPtr = pop(setVal(table, newKey, newVal, 0, false))
    #echo("oldPtr = ", cast[int](oldPtr), " newPtr = ", cast[int](newPtr))
  when not (V is Raw):
    if newVal != oldPtr and oldPtr != 0:
      deallocShared(cast[ptr V](oldPtr))



proc get*[K,V](table: var PConcTable[K,V], key: var K): V =
  when not (V is Raw):
    when not (K is Raw):
      return popPtr[V](getVal(table, cast[int](key.addr)))[]
    else:
      return popPtr[V](getVal(table, pack(key)))[]
  else:
    when not (K is Raw):
      return popRaw(getVal(table, cast[int](key.addr)))
    else:
      return popRaw(getVal(table, pack(key)))











#proc `[]`[K,V](table: var PConcTable[K,V], key: K): PEntry[K,V] {.inline.} =
#  getVal(table, key)

#proc `[]=`[K,V](table: var PConcTable[K,V], key: K, val: V): PEntry[K,V] {.inline.} =
#  setVal(table, key, val)






#Tests ----------------------------
when not defined(testing) and isMainModule:
  import locks, times, mersenne

  const
    numTests  = 100000
    numThreads = 10



  type
    TestObj = tuple
      thr: int
      f0: int
      f1: int

    Data = tuple[k: string,v: TestObj]
    PDataArr = array[0..numTests-1, Data]
    Dict = PConcTable[string,TestObj]
  {.deprecated: [TTestObj: TestObj, TData: Data].}

  var
    thr: array[0..numThreads-1, Thread[Dict]]

    table = newLFTable[string,TestObj](8)
    rand = newMersenneTwister(2525)

  proc createSampleData(len: int): PDataArr =
    #result = cast[PDataArr](allocShared0(sizeof(Data)*numTests))
    for i in 0..len-1:
      result[i].k = "mark" & $(i+1)
      #echo("mark" & $(i+1), " ", hash("mark" & $(i+1)))
      result[i].v.thr = 0
      result[i].v.f0 = i+1
      result[i].v.f1 = 0
      #echo("key = " & $(i+1) & " Val ptr = " & $cast[int](result[i].v.addr))



  proc threadProc(tp: Dict) {.thread.} =
    var t = cpuTime();
    for i in 1..numTests:
      var key = "mark" & $(i)
      var got = table.get(key)
      got.thr = cast[int](myThreadID[pointer]())
      got.f1 = got.f1 + 1
      table.set(key, got)
    t = cpuTime() - t
    echo t


  var testData = createSampleData(numTests)

  for i in 0..numTests-1:
    table.set(testData[i].k, testData[i].v)

  var i = 0
  while i < numThreads:
    createThread(thr[i], threadProc, table)
    i += 1

  joinThreads(thr)





  var fails = 0

  for i in 0..numTests-1:
    var got = table.get(testData[i].k)
    if got.f0 != i+1 or got.f1 != numThreads:
      fails += 1
      echo(got)

  echo("Failed read or write = ", fails)


  #for i in 1..numTests:
  #  echo(i, " = ", hashInt(i) and 8191)

  deleteConcTable(table)