summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--lib/system/alloc.nim51
-rw-r--r--tests/alloc/tmembug.nim54
-rw-r--r--tests/alloc/tmembug2.nim58
-rw-r--r--tests/threads/tmembug.nim51
4 files changed, 187 insertions, 27 deletions
diff --git a/lib/system/alloc.nim b/lib/system/alloc.nim
index 9c7c83aab..441dc1719 100644
--- a/lib/system/alloc.nim
+++ b/lib/system/alloc.nim
@@ -93,8 +93,6 @@ type
     freeList: ptr FreeCell
     free: int            # how many bytes remain
     acc: int             # accumulator for small object allocation
-    when defined(gcDestructors):
-      sharedFreeList: ptr FreeCell # make no attempt at avoiding false sharing for now for this object field
     data {.align: MemAlign.}: UncheckedArray[byte]      # start of usable memory
 
   BigChunk = object of BaseChunk # not necessarily > PageSize!
@@ -109,7 +107,9 @@ type
   MemRegion = object
     when not defined(gcDestructors):
       minLargeObj, maxLargeObj: int
-    freeSmallChunks: array[0..max(1,SmallChunkSize div MemAlign-1), PSmallChunk]
+    freeSmallChunks: array[0..max(1, SmallChunkSize div MemAlign-1), PSmallChunk]
+    when defined(gcDestructors):
+      sharedFreeLists: array[0..max(1, SmallChunkSize div MemAlign-1), ptr FreeCell]
     flBitmap: uint32
     slBitmap: array[RealFli, uint32]
     matrix: array[RealFli, array[MaxSli, PBigChunk]]
@@ -777,8 +777,10 @@ when defined(gcDestructors):
     sysAssert c.next == nil, "c.next pointer must be nil"
     atomicPrepend a.sharedFreeListBigChunks, c
 
-  proc addToSharedFreeList(c: PSmallChunk; f: ptr FreeCell) {.inline.} =
-    atomicPrepend c.sharedFreeList, f
+  proc addToSharedFreeList(c: PSmallChunk; f: ptr FreeCell; size: int) {.inline.} =
+    atomicPrepend c.owner.sharedFreeLists[size], f
+
+  const MaxSteps = 20
 
   proc compensateCounters(a: var MemRegion; c: PSmallChunk; size: int) =
     # rawDealloc did NOT do the usual:
@@ -788,30 +790,26 @@ when defined(gcDestructors):
     # we split the list in order to achieve bounded response times.
     var it = c.freeList
     var x = 0
-    var maxIters = 20 # make it time-bounded
     while it != nil:
-      if maxIters == 0:
-        let rest = it.next.loada
-        if rest != nil:
-          it.next.storea nil
-          addToSharedFreeList(c, rest)
-        break
       inc x, size
-      it = it.next.loada
-      dec maxIters
-    inc(c.free, x)
+      let chunk = cast[PSmallChunk](pageAddr(it))
+      inc(chunk.free, x)
+      it = it.next
     dec(a.occ, x)
 
   proc freeDeferredObjects(a: var MemRegion; root: PBigChunk) =
     var it = root
-    var maxIters = 20 # make it time-bounded
+    var maxIters = MaxSteps # make it time-bounded
     while true:
+      let rest = it.next.loada
+      it.next.storea nil
+      deallocBigChunk(a, cast[PBigChunk](it))
       if maxIters == 0:
-        let rest = it.next.loada
-        it.next.storea nil
-        addToSharedFreeListBigChunks(a, rest)
+        if rest != nil:
+          addToSharedFreeListBigChunks(a, rest)
+          sysAssert a.sharedFreeListBigChunks != nil, "re-enqueing failed"
         break
-      it = it.next.loada
+      it = rest
       dec maxIters
       if it == nil: break
 
@@ -835,8 +833,6 @@ proc rawAlloc(a: var MemRegion, requestedSize: int): pointer =
       sysAssert c.size == PageSize, "rawAlloc 3"
       c.size = size
       c.acc = size
-      when defined(gcDestructors):
-        c.sharedFreeList = nil
       c.free = SmallChunkSize - smallChunkOverhead() - size
       sysAssert c.owner == addr(a), "rawAlloc: No owner set!"
       c.next = nil
@@ -853,10 +849,11 @@ proc rawAlloc(a: var MemRegion, requestedSize: int): pointer =
       when defined(gcDestructors):
         if c.freeList == nil:
           when hasThreadSupport:
-            c.freeList = atomicExchangeN(addr c.sharedFreeList, nil, ATOMIC_RELAXED)
+            # Steal the entire list from `sharedFreeList`:
+            c.freeList = atomicExchangeN(addr a.sharedFreeLists[s], nil, ATOMIC_RELAXED)
           else:
-            c.freeList = c.sharedFreeList
-            c.sharedFreeList = nil
+            c.freeList = a.sharedFreeLists[s]
+            a.sharedFreeLists[s] = nil
           compensateCounters(a, c, size)
       if c.freeList == nil:
         sysAssert(c.acc + smallChunkOverhead() + size <= SmallChunkSize,
@@ -923,7 +920,7 @@ proc rawDealloc(a: var MemRegion, p: pointer) =
   if isSmallChunk(c):
     # `p` is within a small chunk:
     var c = cast[PSmallChunk](c)
-    var s = c.size
+    let s = c.size
     #       ^ We might access thread foreign storage here.
     # The other thread cannot possibly free this block as it's still alive.
     var f = cast[ptr FreeCell](p)
@@ -957,7 +954,7 @@ proc rawDealloc(a: var MemRegion, p: pointer) =
           freeBigChunk(a, cast[PBigChunk](c))
     else:
       when defined(gcDestructors):
-        addToSharedFreeList(c, f)
+        addToSharedFreeList(c, f, s div MemAlign)
     sysAssert(((cast[int](p) and PageMask) - smallChunkOverhead()) %%
                s == 0, "rawDealloc 2")
   else:
diff --git a/tests/alloc/tmembug.nim b/tests/alloc/tmembug.nim
new file mode 100644
index 000000000..63b51ec5d
--- /dev/null
+++ b/tests/alloc/tmembug.nim
@@ -0,0 +1,54 @@
+discard """
+  joinable: false
+"""
+
+import std / [atomics, strutils, sequtils]
+
+type
+  BackendMessage* = object
+    field*: seq[int]
+
+var
+  chan1: Channel[BackendMessage]
+  chan2: Channel[BackendMessage]
+
+chan1.open()
+chan2.open()
+
+proc routeMessage*(msg: BackendMessage) =
+  discard chan2.trySend(msg)
+
+var
+  recv: Thread[void]
+  stopToken: Atomic[bool]
+
+proc recvMsg() =
+  while not stopToken.load(moRelaxed):
+    let resp = chan1.tryRecv()
+    if resp.dataAvailable:
+      routeMessage(resp.msg)
+      echo "child consumes ", formatSize getOccupiedMem()
+
+createThread[void](recv, recvMsg)
+
+const MESSAGE_COUNT = 100
+
+proc main() =
+  let msg: BackendMessage = BackendMessage(field: (0..500).toSeq())
+  for j in 0..0: #100:
+    echo "New iteration"
+
+    for _ in 1..MESSAGE_COUNT:
+      chan1.send(msg)
+    echo "After sending"
+
+    var counter = 0
+    while counter < MESSAGE_COUNT:
+      let resp = recv(chan2)
+      counter.inc
+    echo "After receiving ", formatSize getOccupiedMem()
+
+  stopToken.store true, moRelaxed
+  joinThreads(recv)
+
+main()
diff --git a/tests/alloc/tmembug2.nim b/tests/alloc/tmembug2.nim
new file mode 100644
index 000000000..01bce6f14
--- /dev/null
+++ b/tests/alloc/tmembug2.nim
@@ -0,0 +1,58 @@
+discard """
+  disabled: "true"
+"""
+
+import std / [atomics, strutils, sequtils, isolation]
+
+import threading / channels
+
+type
+  BackendMessage* = object
+    field*: seq[int]
+
+const MESSAGE_COUNT = 100
+
+var
+  chan1 = newChan[BackendMessage](MESSAGE_COUNT*2)
+  chan2 = newChan[BackendMessage](MESSAGE_COUNT*2)
+
+#chan1.open()
+#chan2.open()
+
+proc routeMessage*(msg: BackendMessage) =
+  var m = isolate(msg)
+  discard chan2.trySend(m)
+
+var
+  thr: Thread[void]
+  stopToken: Atomic[bool]
+
+proc recvMsg() =
+  while not stopToken.load(moRelaxed):
+    var resp: BackendMessage
+    if chan1.tryRecv(resp):
+      #if resp.dataAvailable:
+      routeMessage(resp)
+      echo "child consumes ", formatSize getOccupiedMem()
+
+createThread[void](thr, recvMsg)
+
+proc main() =
+  let msg: BackendMessage = BackendMessage(field: (0..5).toSeq())
+  for j in 0..100:
+    echo "New iteration"
+
+    for _ in 1..MESSAGE_COUNT:
+      chan1.send(msg)
+    echo "After sending"
+
+    var counter = 0
+    while counter < MESSAGE_COUNT:
+      let resp = recv(chan2)
+      counter.inc
+    echo "After receiving ", formatSize getOccupiedMem()
+
+  stopToken.store true, moRelaxed
+  joinThreads(thr)
+
+main()
diff --git a/tests/threads/tmembug.nim b/tests/threads/tmembug.nim
new file mode 100644
index 000000000..3618f0ecc
--- /dev/null
+++ b/tests/threads/tmembug.nim
@@ -0,0 +1,51 @@
+
+import std / [atomics, strutils, sequtils]
+
+type
+  BackendMessage* = object
+    field*: seq[int]
+
+var
+  chan1: Channel[BackendMessage]
+  chan2: Channel[BackendMessage]
+
+chan1.open()
+chan2.open()
+
+proc routeMessage*(msg: BackendMessage) =
+  discard chan2.trySend(msg)
+
+var
+  recv: Thread[void]
+  stopToken: Atomic[bool]
+
+proc recvMsg() =
+  while not stopToken.load(moRelaxed):
+    let resp = chan1.tryRecv()
+    if resp.dataAvailable:
+      routeMessage(resp.msg)
+      echo "child consumes ", formatSize getOccupiedMem()
+
+createThread[void](recv, recvMsg)
+
+const MESSAGE_COUNT = 100
+
+proc main() =
+  let msg: BackendMessage = BackendMessage(field: (0..500).toSeq())
+  for j in 0..0: #100:
+    echo "New iteration"
+
+    for _ in 1..MESSAGE_COUNT:
+      chan1.send(msg)
+    echo "After sending"
+
+    var counter = 0
+    while counter < MESSAGE_COUNT:
+      let resp = recv(chan2)
+      counter.inc
+    echo "After receiving ", formatSize getOccupiedMem()
+
+  stopToken.store true, moRelaxed
+  joinThreads(recv)
+
+main()