diff options
Diffstat (limited to 'lib/pure/collections')
-rw-r--r-- | lib/pure/collections/LockFreeHash.nim | 581 | ||||
-rw-r--r-- | lib/pure/collections/baseutils.nim | 41 | ||||
-rw-r--r-- | lib/pure/collections/sequtils.nim | 69 |
3 files changed, 691 insertions, 0 deletions
diff --git a/lib/pure/collections/LockFreeHash.nim b/lib/pure/collections/LockFreeHash.nim new file mode 100644 index 000000000..d3a91763a --- /dev/null +++ b/lib/pure/collections/LockFreeHash.nim @@ -0,0 +1,581 @@ +#nimrod c -t:-march=i686 --cpu:amd64 --threads:on -d:release lockfreehash.nim + +import baseutils, unsigned, math, hashes + + + +const + minTableSize = 8 + reProbeLimit = 12 + minCopyWork = 4096 + intSize = sizeof(int) + + + +when sizeof(int) == 4: # 32bit + type + TRaw = range[0..1073741823] + ## The range of uint values that can be stored directly in a value slot + ## when on a 32 bit platform + +elif sizeof(int) == 8: # 64bit + type + TRaw = range[0..4611686018427387903] + ## The range of uint values that can be stored directly in a value slot + ## when on a 64 bit platform +else: echo("unsupported platform") + +type + TEntry = tuple + key: int + value: int + + TEntryArr = ptr array[0..10_000_000, TEntry] + + PConcTable[K,V] = ptr object {.pure.} + len: int + used: int + active: int + copyIdx: int + copyDone: int + next: PConcTable[K,V] + data: TEntryArr + + +proc setVal[K,V](table: var PConcTable[K,V], key: int, val: int, + expVal: int, match: bool): int + +#------------------------------------------------------------------------------ + +# Create a new table +proc newLFTable*[K,V](size: int = minTableSize): PConcTable[K,V] = + let + dataLen = max(nextPowerOfTwo(size), minTableSize) + dataSize = dataLen*sizeof(TEntry) + dataMem = allocShared0(dataSize) + tableSize = 7 * intSize + tableMem = allocShared0(tableSize) + table = cast[PConcTable[K,V]](tableMem) + table.len = dataLen + table.used = 0 + table.active = 0 + table.copyIdx = 0 + table.copyDone = 0 + table.next = nil + table.data = cast[TEntryArr](dataMem) + result = table + +#------------------------------------------------------------------------------ + +# Delete a table +proc deleteConcTable[K,V](tbl: PConcTable[K,V]) = + deallocShared(tbl.data) + deallocShared(tbl) + +#------------------------------------------------------------------------------ + +proc `[]`[K,V](table: var PConcTable[K,V], i: int): var TEntry {.inline.} = + table.data[i] + +#------------------------------------------------------------------------------ +# State flags stored in ptr + + +proc pack[T](x: T): int {.inline.} = + result = (cast[int](x) shl 2) + #echo("packKey ",cast[int](x) , " -> ", result) + +# Pop the flags off returning a 4 byte aligned ptr to our Key or Val +proc pop(x: int): int {.inline.} = + result = x and 0xFFFFFFFC'i32 + +# Pop the raw value off of our Key or Val +proc popRaw(x: int): int {.inline.} = + result = x shr 2 + +# Pop the flags off returning a 4 byte aligned ptr to our Key or Val +proc popPtr[V](x: int): ptr V {.inline.} = + result = cast[ptr V](pop(x)) + #echo("popPtr " & $x & " -> " & $cast[int](result)) + +# Ghost (sentinel) +# K or V is no longer valid use new table +const Ghost = 0xFFFFFFFC +proc isGhost(x: int): bool {.inline.} = + result = x == 0xFFFFFFFC + +# Tombstone +# applied to V = K is dead +proc isTomb(x: int): bool {.inline.} = + result = (x and 0x00000002) != 0 + +proc setTomb(x: int): int {.inline.} = + result = x or 0x00000002 + +# Prime +# K or V is in new table copied from old +proc isPrime(x: int): bool {.inline.} = + result = (x and 0x00000001) != 0 + +proc setPrime(x: int): int {.inline.} = + result = x or 0x00000001 + +#------------------------------------------------------------------------------ + +##This is for i32 only need to override for i64 +proc hashInt(x: int):int {.inline.} = + var h = uint32(x) #shr 2'u32 + h = h xor (h shr 16'u32) + h *= 0x85ebca6b'u32 + h = h xor (h shr 13'u32) + h *= 0xc2b2ae35'u32 + h = h xor (h shr 16'u32) + result = int(h) + +#------------------------------------------------------------------------------ + +proc resize[K,V](self: PConcTable[K,V]): PConcTable[K,V] = + var next = atomic_load_n(self.next.addr, ATOMIC_RELAXED) + #echo("next = " & $cast[int](next)) + if next != nil: + #echo("A new table already exists, copy in progress") + return next + var + oldLen = atomic_load_n(self.len.addr, ATOMIC_RELAXED) + newTable = newLFTable[K,V](oldLen*2) + success = atomic_compare_exchange_n(self.next.addr, next.addr, newTable, + false, ATOMIC_RELAXED, ATOMIC_RELAXED) + if not success: + echo("someone beat us to it! delete table we just created and return his " & $cast[int](next)) + deleteConcTable(newTable) + return next + else: + echo("Created New Table! " & $cast[int](newTable) & " Size = " & $newTable.len) + return newTable + + +#------------------------------------------------------------------------------ +#proc keyEQ[K](key1: ptr K, key2: ptr K): bool {.inline.} = +proc keyEQ[K](key1: int, key2: int): bool {.inline.} = + result = false + when K is TRaw: + if key1 == key2: + result = true + else: + var + p1 = popPtr[K](key1) + p2 = popPtr[K](key2) + if p1 != nil and p2 != nil: + if cast[int](p1) == cast[int](p2): + return true + if p1[] == p2[]: + return true + +#------------------------------------------------------------------------------ + +#proc tableFull(self: var PConcTable[K,V]) : bool {.inline.} = + + +#------------------------------------------------------------------------------ + +proc copySlot[K,V](idx: int, oldTbl: var PConcTable[K,V], newTbl: var PConcTable[K,V]): bool = + #echo("Copy idx " & $idx) + var + oldVal = 0 + oldkey = 0 + ok = false + result = false + #Block the key so no other threads waste time here + while not ok: + ok = atomic_compare_exchange_n(oldTbl[idx].key.addr, oldKey.addr, + setTomb(oldKey), false, ATOMIC_RELAXED, ATOMIC_RELAXED) + #echo("oldKey was = " & $oldKey & " set it to tomb " & $setTomb(oldKey)) + #Prevent new values from appearing in the old table by priming + oldVal = atomic_load_n(oldTbl[idx].value.addr, ATOMIC_RELAXED) + while not isPrime(oldVal): + var box = if oldVal == NULL or isTomb(oldVal) : oldVal.setTomb.setPrime + else: oldVal.setPrime + if atomic_compare_exchange_n(oldTbl[idx].value.addr, oldVal.addr, + box, false, ATOMIC_RELAXED, ATOMIC_RELAXED): + if isPrime(box) and isTomb(box): + return true + oldVal = box + break + #echo("oldVal was = ", oldVal, " set it to prime ", box) + if isPrime(oldVal) and isTomb(oldVal): + #when not (K is TRaw): + # deallocShared(popPtr[K](oldKey)) + return false + if isTomb(oldVal): + echo("oldVal is Tomb!!!, should not happen") + if pop(oldVal) != NULL: + result = setVal(newTbl, pop(oldKey), pop(oldVal), NULL, true) == NULL + if result: + #echo("Copied a Slot! idx= " & $idx & " key= " & $oldKey & " val= " & $oldVal) + else: + #echo("copy slot failed") + # Our copy is done so we disable the old slot + while not ok: + ok = atomic_compare_exchange_n(oldTbl[idx].value.addr, oldVal.addr, + oldVal.setTomb.setPrime , false, ATOMIC_RELAXED, ATOMIC_RELAXED) + #echo("disabled old slot") + #echo"---------------------" + +#------------------------------------------------------------------------------ + +proc promote[K,V](table: var PConcTable[K,V]) = + var + newData = atomic_load_n(table.next.data.addr, ATOMIC_RELAXED) + newLen = atomic_load_n(table.next.len.addr, ATOMIC_RELAXED) + newUsed = atomic_load_n(table.next.used.addr, ATOMIC_RELAXED) + + deallocShared(table.data) + atomic_store_n(table.data.addr, newData, ATOMIC_RELAXED) + atomic_store_n(table.len.addr, newLen, ATOMIC_RELAXED) + atomic_store_n(table.used.addr, newUsed, ATOMIC_RELAXED) + atomic_store_n(table.copyIdx.addr, 0, ATOMIC_RELAXED) + atomic_store_n(table.copyDone.addr, 0, ATOMIC_RELAXED) + deallocShared(table.next) + atomic_store_n(table.next.addr, nil, ATOMIC_RELAXED) + echo("new table swapped!") + +#------------------------------------------------------------------------------ + +proc checkAndPromote[K,V](table: var PConcTable[K,V], workDone: int): bool = + var + oldLen = atomic_load_n(table.len.addr, ATOMIC_RELAXED) + copyDone = atomic_load_n(table.copyDone.addr, ATOMIC_RELAXED) + ok: bool + result = false + if workDone > 0: + #echo("len to copy =" & $oldLen) + #echo("copyDone + workDone = " & $copyDone & " + " & $workDone) + while not ok: + ok = atomic_compare_exchange_n(table.copyDone.addr, copyDone.addr, + copyDone + workDone, false, ATOMIC_RELAXED, ATOMIC_RELAXED) + #if ok: echo("set copyDone") + # If the copy is done we can promote this table + if copyDone + workDone >= oldLen: + # Swap new data + #echo("work is done!") + table.promote + result = true + +#------------------------------------------------------------------------------ + +proc copySlotAndCheck[K,V](table: var PConcTable[K,V], idx: int): + PConcTable[K,V] = + var + newTable = cast[PConcTable[K,V]](atomic_load_n(table.next.addr, ATOMIC_RELAXED)) + result = newTable + if newTable != nil and copySlot(idx, table, newTable): + #echo("copied a single slot, idx = " & $idx) + if checkAndPromote(table, 1): return table + + +#------------------------------------------------------------------------------ + +proc helpCopy[K,V](table: var PConcTable[K,V]): PConcTable[K,V] = + var + newTable = cast[PConcTable[K,V]](atomic_load_n(table.next.addr, ATOMIC_RELAXED)) + result = newTable + if newTable != nil: + var + oldLen = atomic_load_n(table.len.addr, ATOMIC_RELAXED) + copyDone = atomic_load_n(table.copyDone.addr, ATOMIC_RELAXED) + copyIdx = 0 + work = min(oldLen, minCopyWork) + #panicStart = -1 + workDone = 0 + if copyDone < oldLen: + var ok: bool + while not ok: + ok = atomic_compare_exchange_n(table.copyIdx.addr, copyIdx.addr, + copyIdx + work, false, ATOMIC_RELAXED, ATOMIC_RELAXED) + #echo("copy idx = ", copyIdx) + for i in 0..work-1: + var idx = (copyIdx + i) and (oldLen - 1) + if copySlot(idx, table, newTable): + workDone += 1 + if workDone > 0: + #echo("did work ", workDone, " on thread ", cast[int](myThreadID[pointer]())) + if checkAndPromote(table, workDone): return table + # In case a thread finished all the work then got stalled before promotion + if checkAndPromote(table, 0): return table + + + +#------------------------------------------------------------------------------ + +proc setVal[K,V](table: var PConcTable[K,V], key: int, val: int, + expVal: int, match: bool): int = + #echo("-try set- in table ", " key = ", (popPtr[K](key)[]), " val = ", val) + when K is TRaw: + var idx = hashInt(key) + else: + var idx = popPtr[K](key)[].hash + var + nextTable: PConcTable[K,V] + probes = 1 + # spin until we find a key slot or build and jump to next table + while true: + idx = idx and (table.len - 1) + #echo("try set idx = " & $idx & "for" & $key) + var + probedKey = NULL + openKey = atomic_compare_exchange_n(table[idx].key.addr, probedKey.addr, + key, false, ATOMIC_RELAXED, ATOMIC_RELAXED) + if openKey: + if val.isTomb: + #echo("val was tomb, bail, no reason to set an open slot to tomb") + return val + #increment used slots + #echo("found an open slot, total used = " & + #$atomic_add_fetch(table.used.addr, 1, ATOMIC_RELAXED)) + discard atomic_add_fetch(table.used.addr, 1, ATOMIC_RELAXED) + break # We found an open slot + #echo("set idx ", idx, " key = ", key, " probed = ", probedKey) + if keyEQ[K](probedKey, key): + #echo("we found the matching slot") + break # We found a matching slot + if (not(expVal != NULL and match)) and (probes >= reProbeLimit or key.isTomb): + if key.isTomb: echo("Key is Tombstone") + #if probes >= reProbeLimit: echo("Too much probing " & $probes) + #echo("try to resize") + #create next bigger table + nextTable = resize(table) + #help do some copying + #echo("help copy old table to new") + nextTable = helpCopy(table) + #now setVal in the new table instead + #echo("jumping to next table to set val") + return setVal(nextTable, key, val, expVal, match) + else: + idx += 1 + probes += 1 + # Done spinning for a new slot + var oldVal = atomic_load_n(table[idx].value.addr, ATOMIC_RELAXED) + if val == oldVal: + #echo("this val is alredy in the slot") + return oldVal + nextTable = atomic_load_n(table.next.addr, ATOMIC_SEQ_CST) + if nextTable == nil and + ((oldVal == NULL and + (probes >= reProbeLimit or table.used / table.len > 0.8)) or + (isPrime(oldVal))): + if table.used / table.len > 0.8: echo("resize because usage ratio = " & + $(table.used / table.len)) + if isPrime(oldVal): echo("old val isPrime, should be a rare mem ordering event") + nextTable = resize(table) + if nextTable != nil: + #echo("tomb old slot then set in new table") + nextTable = copySlotAndCheck(table,idx) + return setVal(nextTable, key, val, expVal, match) + # Finaly ready to add new val to table + while true: + if match and oldVal != expVal: + #echo("set failed, no match oldVal= " & $oldVal & " expVal= " & $expVal) + return oldVal + if atomic_compare_exchange_n(table[idx].value.addr, oldVal.addr, + val, false, ATOMIC_RELEASE, ATOMIC_RELAXED): + #echo("val set at table " & $cast[int](table)) + if expVal != NULL: + if (oldVal == NULL or isTomb(oldVal)) and not isTomb(val): + discard atomic_add_fetch(table.active.addr, 1, ATOMIC_RELAXED) + elif not (oldVal == NULL or isTomb(oldVal)) and isTomb(val): + discard atomic_add_fetch(table.active.addr, -1, ATOMIC_RELAXED) + if oldVal == NULL and expVal != NULL: + return setTomb(oldVal) + else: return oldVal + if isPrime(oldVal): + nextTable = copySlotAndCheck(table, idx) + return setVal(nextTable, key, val, expVal, match) + +#------------------------------------------------------------------------------ + +proc getVal[K,V](table: var PConcTable[K,V], key: int): int = + #echo("-try get- key = " & $key) + when K is TRaw: + var idx = hashInt(key) + else: + var idx = popPtr[K](key)[].hash + #echo("get idx ", idx) + var + probes = 0 + val: int + while true: + idx = idx and (table.len - 1) + var + newTable: PConcTable[K,V] # = atomic_load_n(table.next.addr, ATOMIC_ACQUIRE) + probedKey = atomic_load_n(table[idx].key.addr, ATOMIC_SEQ_CST) + if keyEQ[K](probedKey, key): + #echo("found key after ", probes+1) + val = atomic_load_n(table[idx].value.addr, ATOMIC_ACQUIRE) + if not isPrime(val): + if isTomb(val): + #echo("val was tomb but not prime") + return NULL + else: + #echo("-GotIt- idx = ", idx, " key = ", key, " val ", val ) + return val + else: + newTable = copySlotAndCheck(table, idx) + return getVal(newTable, key) + else: + #echo("probe ", probes, " idx = ", idx, " key = ", key, " found ", probedKey ) + if probes >= reProbeLimit*4 or key.isTomb: + if newTable == nil: + #echo("too many probes and no new table ", key, " ", idx ) + return NULL + else: + newTable = helpCopy(table) + return getVal(newTable, key) + idx += 1 + probes += 1 + +#------------------------------------------------------------------------------ + +#proc set*(table: var PConcTable[TRaw,TRaw], key: TRaw, val: TRaw) = +# discard setVal(table, pack(key), pack(key), NULL, false) + +#proc set*[V](table: var PConcTable[TRaw,V], key: TRaw, val: ptr V) = +# discard setVal(table, pack(key), cast[int](val), NULL, false) + +proc set*[K,V](table: var PConcTable[K,V], key: var K, val: var V) = + when not (K is TRaw): + var newKey = cast[int](copyShared(key)) + else: + var newKey = pack(key) + when not (V is TRaw): + var newVal = cast[int](copyShared(val)) + else: + var newVal = pack(val) + var oldPtr = pop(setVal(table, newKey, newVal, NULL, false)) + #echo("oldPtr = ", cast[int](oldPtr), " newPtr = ", cast[int](newPtr)) + when not (V is TRaw): + if newVal != oldPtr and oldPtr != NULL: + deallocShared(cast[ptr V](oldPtr)) + + + +proc get*[K,V](table: var PConcTable[K,V], key: var K): V = + when not (V is TRaw): + when not (K is TRaw): + return popPtr[V](getVal(table, cast[int](key.addr)))[] + else: + return popPtr[V](getVal(table, pack(key)))[] + else: + when not (K is TRaw): + return popRaw(getVal(table, cast[int](key.addr))) + else: + return popRaw(getVal(table, pack(key))) + + + + + + + + + + + +#proc `[]`[K,V](table: var PConcTable[K,V], key: K): PEntry[K,V] {.inline.} = +# getVal(table, key) + +#proc `[]=`[K,V](table: var PConcTable[K,V], key: K, val: V): PEntry[K,V] {.inline.} = +# setVal(table, key, val) + + + + + + +#Tests ---------------------------- +when isMainModule: + import locks, times, mersenne + + const + numTests = 100000 + numThreads = 10 + + + + type + TTestObj = tuple + thr: int + f0: int + f1: int + + TData = tuple[k: string,v: TTestObj] + PDataArr = array[0..numTests-1, TData] + Dict = PConcTable[string,TTestObj] + + var + thr: array[0..numThreads-1, TThread[Dict]] + + table = newLFTable[string,TTestObj](8) + rand = newMersenneTwister(2525) + + proc createSampleData(len: int): PDataArr = + #result = cast[PDataArr](allocShared0(sizeof(TData)*numTests)) + for i in 0..len-1: + result[i].k = "mark" & $(i+1) + #echo("mark" & $(i+1), " ", hash("mark" & $(i+1))) + result[i].v.thr = 0 + result[i].v.f0 = i+1 + result[i].v.f1 = 0 + #echo("key = " & $(i+1) & " Val ptr = " & $cast[int](result[i].v.addr)) + + + + proc threadProc(tp: Dict) {.thread.} = + var t = cpuTime(); + for i in 1..numTests: + var key = "mark" & $(i) + var got = table.get(key) + got.thr = cast[int](myThreadID[pointer]()) + got.f1 = got.f1 + 1 + table.set(key, got) + t = cpuTime() - t + echo t + + + var testData = createSampleData(numTests) + + for i in 0..numTests-1: + table.set(testData[i].k, testData[i].v) + + var i = 0 + while i < numThreads: + createThread(thr[i], threadProc, table) + i += 1 + + joinThreads(thr) + + + + + + var fails = 0 + + for i in 0..numTests-1: + var got = table.get(testData[i].k) + if got.f0 != i+1 or got.f1 != numThreads: + fails += 1 + echo(got) + + echo("Failed read or write = ", fails) + + + #for i in 1..numTests: + # echo(i, " = ", hashInt(i) and 8191) + + deleteConcTable(table) + + + + + + + diff --git a/lib/pure/collections/baseutils.nim b/lib/pure/collections/baseutils.nim new file mode 100644 index 000000000..565a89ccb --- /dev/null +++ b/lib/pure/collections/baseutils.nim @@ -0,0 +1,41 @@ + + + +#------------------------------------------------------------------------------ +## Useful Constants +const NULL* = 0 + + +#------------------------------------------------------------------------------ +## Memory Utility Functions + +proc newHeap*[T](): ptr T = + result = cast[ptr T](alloc0(sizeof(T))) + +proc copyNew*[T](x: var T): ptr T = + var + size = sizeof(T) + mem = alloc(size) + copyMem(mem, x.addr, size) + return cast[ptr T](mem) + +proc copyTo*[T](val: var T, dest: int) = + copyMem(pointer(dest), val.addr, sizeof(T)) + +proc allocType*[T](): pointer = alloc(sizeof(T)) + +proc newShared*[T](): ptr T = + result = cast[ptr T](allocShared0(sizeof(T))) + +proc copyShared*[T](x: var T): ptr T = + var + size = sizeof(T) + mem = allocShared(size) + copyMem(mem, x.addr, size) + return cast[ptr T](mem) + +#------------------------------------------------------------------------------ +## Pointer arithmetic + +proc `+`*(p: pointer, i: int): pointer {.inline.} = + cast[pointer](cast[int](p) + i) \ No newline at end of file diff --git a/lib/pure/collections/sequtils.nim b/lib/pure/collections/sequtils.nim index 705a97469..3a009a8cb 100644 --- a/lib/pure/collections/sequtils.nim +++ b/lib/pure/collections/sequtils.nim @@ -117,6 +117,57 @@ proc filter*[T](seq1: seq[T], pred: proc(item: T): bool {.closure.}): seq[T] = ## assert f2 == @["yellow"] accumulateResult(filter(seq1, pred)) +proc delete*[T](s: var seq[T], first=0, last=0) = + ## Deletes in `s` the items at position `first` .. `last`. This modifies + ## `s` itself, it does not return a copy. + ## + ## Example: + ## + ##.. code-block:: nimrod + ## let outcome = @[1,1,1,1,1,1,1,1] + ## var dest = @[1,1,1,2,2,2,2,2,2,1,1,1,1,1] + ## dest.delete(3, 8) + ## assert outcome == dest + + var i = first + var j = last+1 + var newLen = len(s)-j+i + while i < newLen: + s[i].shallowCopy(s[j]) + inc(i) + inc(j) + setlen(s, newLen) + +proc insert*[T](dest: var seq[T], src: openArray[T], pos=0) = + ## Inserts items from `src` into `dest` at position `pos`. This modifies + ## `dest` itself, it does not return a copy. + ## + ## Example: + ## + ##.. code-block:: nimrod + ## var dest = @[1,1,1,1,1,1,1,1] + ## let + ## src = @[2,2,2,2,2,2] + ## outcome = @[1,1,1,2,2,2,2,2,2,1,1,1,1,1] + ## dest.insert(src, 3) + ## assert dest == outcome + + var j = len(dest) - 1 + var i = len(dest) + len(src) - 1 + dest.setLen(i + 1) + + # Move items after `pos` to the end of the sequence. + while j >= pos: + dest[i].shallowCopy(dest[j]) + dec(i) + dec(j) + # Insert items from `dest` into `dest` at `pos` + inc(j) + for item in src: + dest[j] = item + inc(j) + + template filterIt*(seq1, pred: expr): expr {.immediate.} = ## Returns a new sequence with all the items that fulfilled the predicate. ## @@ -312,4 +363,22 @@ when isMainModule: assert multiplication == 495, "Multiplication is (5*(9*(11)))" assert concatenation == "nimrodiscool" + block: # delete tests + let outcome = @[1,1,1,1,1,1,1,1] + var dest = @[1,1,1,2,2,2,2,2,2,1,1,1,1,1] + dest.delete(3, 8) + assert outcome == dest, """\ + Deleting range 3-9 from [1,1,1,2,2,2,2,2,2,1,1,1,1,1] + is [1,1,1,1,1,1,1,1]""" + + block: # insert tests + var dest = @[1,1,1,1,1,1,1,1] + let + src = @[2,2,2,2,2,2] + outcome = @[1,1,1,2,2,2,2,2,2,1,1,1,1,1] + dest.insert(src, 3) + assert dest == outcome, """\ + Inserting [2,2,2,2,2,2] into [1,1,1,1,1,1,1,1] + at 3 is [1,1,1,2,2,2,2,2,2,1,1,1,1,1]""" + echo "Finished doc tests" |