summary refs log tree commit diff stats
path: root/lib/pure
diff options
context:
space:
mode:
authorYuriy Glukhov <yutiy.glukhov@gmail.com>2016-04-27 23:14:01 +0300
committerYuriy Glukhov <yutiy.glukhov@gmail.com>2016-04-28 00:04:32 +0300
commit1a8f7848342e4d195fd89a8048a9eb504280a953 (patch)
tree8ee5d894a6f09e87537cfe37bc80e896850d3092 /lib/pure
parente31ec746b96ef185d9f5fa6276518949fa889e5a (diff)
downloadNim-1a8f7848342e4d195fd89a8048a9eb504280a953.tar.gz
Added heapqueue collection. Fixed timers in asyncdispatch.
Diffstat (limited to 'lib/pure')
-rw-r--r--lib/pure/asyncdispatch.nim44
-rw-r--r--lib/pure/collections/heapqueue.nim107
2 files changed, 133 insertions, 18 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 01b53cb12..64966c6b5 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -9,7 +9,7 @@
 
 include "system/inclrtl"
 
-import os, oids, tables, strutils, macros, times
+import os, oids, tables, strutils, macros, times, heapqueue
 
 import nativesockets, net
 
@@ -354,16 +354,22 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
 
 type
   PDispatcherBase = ref object of RootRef
-    timers: seq[tuple[finishAt: float, fut: Future[void]]]
-
-proc processTimers(p: PDispatcherBase) =
-  var oldTimers = p.timers
-  p.timers = @[]
-  for t in oldTimers:
-    if epochTime() >= t.finishAt:
-      t.fut.complete()
-    else:
-      p.timers.add(t)
+    timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
+
+proc processTimers(p: PDispatcherBase) {.inline.} =
+  while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt:
+    p.timers.pop().fut.complete()
+
+proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
+  # If dispatcher has active timers this proc returns the timeout
+  # of the nearest timer. Returns `timeout` otherwise.
+  result = timeout
+  if p.timers.len > 0:
+    let timerTimeout = p.timers[0].finishAt
+    let curTime = epochTime()
+    if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout:
+      result = int((timerTimeout - curTime) * 1000)
+      if result < 0: result = 0
 
 when defined(windows) or defined(nimdoc):
   import winlean, sets, hashes
@@ -396,7 +402,7 @@ when defined(windows) or defined(nimdoc):
     new result
     result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
     result.handles = initSet[AsyncFD]()
-    result.timers = @[]
+    result.timers.newHeapQueue()
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -427,9 +433,11 @@ when defined(windows) or defined(nimdoc):
       raise newException(ValueError,
         "No handles or timers registered in dispatcher.")
 
-    let llTimeout =
-      if timeout ==  -1: winlean.INFINITE
-      else: timeout.int32
+    let at = p.adjustedTimeout(timeout)
+    var llTimeout =
+      if at == -1: winlean.INFINITE
+      else: at.int32
+
     var lpNumberOfBytesTransferred: Dword
     var lpCompletionKey: ULONG
     var customOverlapped: PCustomOverlapped
@@ -956,7 +964,7 @@ else:
   proc newDispatcher*(): PDispatcher =
     new result
     result.selector = newSelector()
-    result.timers = @[]
+    result.timers.newHeapQueue()
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -1014,7 +1022,7 @@ else:
 
   proc poll*(timeout = 500) =
     let p = getGlobalDispatcher()
-    for info in p.selector.select(timeout):
+    for info in p.selector.select(p.adjustedTimeout(timeout)):
       let data = PData(info.key.data)
       assert data.fd == info.key.fd.AsyncFD
       #echo("In poll ", data.fd.cint)
@@ -1215,7 +1223,7 @@ proc sleepAsync*(ms: int): Future[void] =
   ## ``ms`` milliseconds.
   var retFuture = newFuture[void]("sleepAsync")
   let p = getGlobalDispatcher()
-  p.timers.add((epochTime() + (ms / 1000), retFuture))
+  p.timers.push((epochTime() + (ms / 1000), retFuture))
   return retFuture
 
 proc accept*(socket: AsyncFD,
diff --git a/lib/pure/collections/heapqueue.nim b/lib/pure/collections/heapqueue.nim
new file mode 100644
index 000000000..149a1c9fc
--- /dev/null
+++ b/lib/pure/collections/heapqueue.nim
@@ -0,0 +1,107 @@
+##[ Heap queue algorithm (a.k.a. priority queue). Ported from Python heapq.
+
+Heaps are arrays for which a[k] <= a[2*k+1] and a[k] <= a[2*k+2] for
+all k, counting elements from 0.  For the sake of comparison,
+non-existing elements are considered to be infinite.  The interesting
+property of a heap is that a[0] is always its smallest element.
+
+]##
+
+type HeapQueue*[T] = distinct seq[T]
+
+proc newHeapQueue*[T](): HeapQueue[T] {.inline.} = HeapQueue[T](newSeq[T]())
+proc newHeapQueue*[T](h: var HeapQueue[T]) {.inline.} = h = HeapQueue[T](newSeq[T]())
+
+proc len*[T](h: HeapQueue[T]): int {.inline.} = seq[T](h).len
+proc `[]`*[T](h: HeapQueue[T], i: int): T {.inline.} = seq[T](h)[i]
+proc `[]=`[T](h: var HeapQueue[T], i: int, v: T) {.inline.} = seq[T](h)[i] = v
+proc add[T](h: var HeapQueue[T], v: T) {.inline.} = seq[T](h).add(v)
+
+proc heapCmp[T](x, y: T): bool {.inline.} =
+  return (x < y)
+
+# 'heap' is a heap at all indices >= startpos, except possibly for pos.  pos
+# is the index of a leaf with a possibly out-of-order value.  Restore the
+# heap invariant.
+proc siftdown[T](heap: var HeapQueue[T], startpos, p: int) =
+  var pos = p
+  var newitem = heap[pos]
+  # Follow the path to the root, moving parents down until finding a place
+  # newitem fits.
+  while pos > startpos:
+    let parentpos = (pos - 1) shr 1
+    let parent = heap[parentpos]
+    if heapCmp(newitem, parent):
+      heap[pos] = parent
+      pos = parentpos
+    else:
+      break
+  heap[pos] = newitem
+
+proc siftup[T](heap: var HeapQueue[T], p: int) =
+  let endpos = len(heap)
+  var pos = p
+  let startpos = pos
+  let newitem = heap[pos]
+  # Bubble up the smaller child until hitting a leaf.
+  var childpos = 2*pos + 1    # leftmost child position
+  while childpos < endpos:
+    # Set childpos to index of smaller child.
+    let rightpos = childpos + 1
+    if rightpos < endpos and not heapCmp(heap[childpos], heap[rightpos]):
+      childpos = rightpos
+    # Move the smaller child up.
+    heap[pos] = heap[childpos]
+    pos = childpos
+    childpos = 2*pos + 1
+  # The leaf at pos is empty now.  Put newitem there, and bubble it up
+  # to its final resting place (by sifting its parents down).
+  heap[pos] = newitem
+  siftdown(heap, startpos, pos)
+    
+proc push*[T](heap: var HeapQueue[T], item: T) =
+  ## Push item onto heap, maintaining the heap invariant.
+  (seq[T](heap)).add(item)
+  siftdown(heap, 0, len(heap)-1)
+
+proc pop*[T](heap: var HeapQueue[T]): T =
+  ## Pop the smallest item off the heap, maintaining the heap invariant.
+  let lastelt = seq[T](heap).pop()
+  if heap.len > 0:
+    result = heap[0]
+    heap[0] = lastelt
+    siftup(heap, 0)
+  else:
+    result = lastelt
+
+proc replace*[T](heap: var HeapQueue[T], item: T): T =
+  ## Pop and return the current smallest value, and add the new item.
+  ## This is more efficient than pop() followed by push(), and can be
+  ## more appropriate when using a fixed-size heap.  Note that the value
+  ## returned may be larger than item!  That constrains reasonable uses of
+  ## this routine unless written as part of a conditional replacement:
+
+  ##    if item > heap[0]:
+  ##        item = replace(heap, item)
+  result = heap[0]
+  heap[0] = item
+  siftup(heap, 0)
+
+proc pushpop*[T](heap: var HeapQueue[T], item: T): T =
+  ## Fast version of a push followed by a pop.
+  if heap.len > 0 and heapCmp(heap[0], item):
+    swap(item, heap[0])
+    siftup(heap, 0)
+  return item
+
+when isMainModule:
+  # Simple sanity test
+  var heap = newHeapQueue[int]()
+  let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
+  for item in data:
+    push(heap, item)
+  doAssert(heap[0] == 0)
+  var sort = newSeq[int]()
+  while heap.len > 0:
+    sort.add(pop(heap))
+  doAssert(sort == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])