summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
authorJason Livesay <ithkuil@gmail.com>2014-04-17 03:47:44 -0700
committerJason Livesay <ithkuil@gmail.com>2014-04-17 03:47:44 -0700
commit1068022dfbf170dc34bf39e45a44451851fcb06f (patch)
tree6ca193296be5078626e5b97bfb161d225491a675 /lib
parentebe174c8687e02404a986e17dffc773378dc98e7 (diff)
downloadNim-1068022dfbf170dc34bf39e45a44451851fcb06f.tar.gz
Allow QUEUED reply only if pipelined; don't return status replies from flushPipeline; Rewrite someTests
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/redis.nim186
1 files changed, 95 insertions, 91 deletions
diff --git a/lib/pure/redis.nim b/lib/pure/redis.nim
index 147a7a82a..c7fcc033f 100644
--- a/lib/pure/redis.nim
+++ b/lib/pure/redis.nim
@@ -58,8 +58,10 @@ proc raiseInvalidReply(expected, got: char) =
           "Expected '$1' at the beginning of a status reply got '$2'" %
           [$expected, $got])
 
-proc raiseNoOK(status: string) =
-  if status != "QUEUED" and status != "OK":
+proc raiseNoOK(status: string, pipelineEnabled:bool) =
+  if pipelineEnabled and not (status == "QUEUED" or status == "PIPELINED"):
+    raise newException(EInvalidReply, "Expected \"QUEUED\" or \"PIPELINED\" got \"$1\"" % status)
+  elif not pipelineEnabled and status != "OK":
     raise newException(EInvalidReply, "Expected \"OK\" got \"$1\"" % status)
 
 template readSocket(r: TRedis, dummyVal:expr): stmt =
@@ -71,7 +73,7 @@ template readSocket(r: TRedis, dummyVal:expr): stmt =
 
 proc parseStatus(r: TRedis, line: string = ""): TRedisStatus =
   if r.pipeline.enabled:
-    return "OK"
+    return "PIPELINED"
 
   if line == "":
     raise newException(ERedis, "Server closed connection prematurely")
@@ -84,14 +86,14 @@ proc parseStatus(r: TRedis, line: string = ""): TRedisStatus =
   return line.substr(1) # Strip '+'
 
 proc readStatus(r:TRedis): TRedisStatus =
-  r.readSocket("OK")
+  r.readSocket("PIPELINED")
   return r.parseStatus(line)
  
 proc parseInteger(r: TRedis, line: string = ""): TRedisInteger =
   if r.pipeline.enabled: return -1
   
-  if line == "+QUEUED":  # inside of multi
-    return -1
+  #if line == "+QUEUED":  # inside of multi
+  #  return -1
 
   if line == "":
     raise newException(ERedis, "Server closed connection prematurely")
@@ -193,7 +195,6 @@ proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList =
     r.socket.send(r.pipeline.buffer)
   r.pipeline.buffer = ""
   
-  var prevState = r.pipeline.enabled
   r.pipeline.enabled = false
   result = @[]
   
@@ -201,23 +202,21 @@ proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList =
 
   for i in 0..tot-1:
     var ret = r.readNext()
-    if ret.len == 1 and (ret[0] == "OK" or ret[0] == "QUEUED"):
-      # Skip acknowledgement replies in multi
-      if not wasMulti: result.add(ret)
-    else:    
-      result.add(ret)
+    for item in ret:
+     var isOK = item.contains("OK")
+     if not (item.contains("OK") or item.contains("QUEUED")):
+       result.add(item)
 
   r.pipeline.expected = 0
-  r.pipeline.enabled = prevState
 
-proc setPipeline*(r: TRedis, state: bool) =
-  ## Enable or disable command pipelining (reduces network roundtrips).
+proc startPipelining*(r: TRedis) =
+  ## Enable command pipelining (reduces network roundtrips).
   ## Note that when enabled, you must call flushPipeline to actually send commands, except
   ## for multi/exec() which enable and flush the pipeline automatically.
   ## Commands return immediately with dummy values; actual results returned from
   ## flushPipeline() or exec()
   r.pipeline.expected = 0
-  r.pipeline.enabled = state
+  r.pipeline.enabled = true
 
 proc sendCommand(r: TRedis, cmd: string, args: varargs[string]) =
   var request = "*" & $(1 + args.len()) & "\c\L"
@@ -300,7 +299,7 @@ proc rename*(r: TRedis, key, newkey: string): TRedisStatus =
   ## 
   ## **WARNING:** Overwrites `newkey` if it exists!
   r.sendCommand("RENAME", key, newkey)
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
   
 proc renameNX*(r: TRedis, key, newkey: string): bool =
   ## Same as ``rename`` but doesn't continue if `newkey` exists.
@@ -372,7 +371,7 @@ proc setk*(r: TRedis, key, value: string) =
   ##
   ## NOTE: This function had to be renamed due to a clash with the `set` type.
   r.sendCommand("SET", key, value)
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc setNX*(r: TRedis, key, value: string): bool =
   ## Set the value of a key, only if the key does not exist. Returns `true`
@@ -389,7 +388,7 @@ proc setBit*(r: TRedis, key: string, offset: int,
 proc setEx*(r: TRedis, key: string, seconds: int, value: string): TRedisStatus =
   ## Set the value and expiration of a key
   r.sendCommand("SETEX", key, $seconds, value)
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc setRange*(r: TRedis, key: string, offset: int, 
                value: string): TRedisInteger =
@@ -452,7 +451,7 @@ proc hMSet*(r: TRedis, key: string,
     args.add(field)
     args.add(value)
   r.sendCommand("HMSET", args)
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc hSet*(r: TRedis, key, field, value: string): TRedisInteger =
   ## Set the string value of a hash field
@@ -546,12 +545,12 @@ proc lRem*(r: TRedis, key: string, value: string, count: int = 0): TRedisInteger
 proc lSet*(r: TRedis, key: string, index: int, value: string) =
   ## Set the value of an element in a list by its index
   r.sendCommand("LSET", key, $index, value)
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
-proc lTrim*(r: TRedis, key: string, start, stop: int) =
+proc lTrim*(r: TRedis, key: string, start, stop: int)  =
   ## Trim a list to the specified range
   r.sendCommand("LTRIM", key, $start, $stop)
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc rPop*(r: TRedis, key: string): TRedisString =
   ## Remove and get the last element in a list
@@ -829,7 +828,7 @@ proc unsubscribe*(r: TRedis, [channel: openarray[string], : string): ???? =
 proc discardMulti*(r: TRedis) =
   ## Discard all commands issued after MULTI
   r.sendCommand("DISCARD")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc exec*(r: TRedis): TRedisList =
   ## Execute all commands issued after MULTI
@@ -842,26 +841,26 @@ proc exec*(r: TRedis): TRedisList =
 
 proc multi*(r: TRedis) =
   ## Mark the start of a transaction block
-  r.setPipeline(true)
+  r.startPipelining()
   r.sendCommand("MULTI")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc unwatch*(r: TRedis) =
   ## Forget about all watched keys
   r.sendCommand("UNWATCH")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc watch*(r: TRedis, key: varargs[string]) =
   ## Watch the given keys to determine execution of the MULTI/EXEC block 
   r.sendCommand("WATCH", key)
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 # Connection
 
 proc auth*(r: TRedis, password: string) =
   ## Authenticate to the server
   r.sendCommand("AUTH", password)
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc echoServ*(r: TRedis, message: string): TRedisString =
   ## Echo the given string
@@ -876,7 +875,7 @@ proc ping*(r: TRedis): TRedisStatus =
 proc quit*(r: TRedis) =
   ## Close the connection
   r.sendCommand("QUIT")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc select*(r: TRedis, index: int): TRedisStatus =
   ## Change the selected database for the current connection 
@@ -888,12 +887,12 @@ proc select*(r: TRedis, index: int): TRedisStatus =
 proc bgrewriteaof*(r: TRedis) =
   ## Asynchronously rewrite the append-only file
   r.sendCommand("BGREWRITEAOF")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc bgsave*(r: TRedis) =
   ## Asynchronously save the dataset to disk
   r.sendCommand("BGSAVE")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc configGet*(r: TRedis, parameter: string): TRedisList =
   ## Get the value of a configuration parameter
@@ -903,12 +902,12 @@ proc configGet*(r: TRedis, parameter: string): TRedisList =
 proc configSet*(r: TRedis, parameter: string, value: string) =
   ## Set a configuration parameter to the given value
   r.sendCommand("CONFIG", "SET", parameter, value)
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc configResetStat*(r: TRedis) =
   ## Reset the stats returned by INFO
   r.sendCommand("CONFIG", "RESETSTAT")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc dbsize*(r: TRedis): TRedisInteger =
   ## Return the number of keys in the selected database
@@ -927,12 +926,12 @@ proc debugSegfault*(r: TRedis) =
 proc flushall*(r: TRedis): TRedisStatus =
   ## Remove all keys from all databases
   r.sendCommand("FLUSHALL")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc flushdb*(r: TRedis): TRedisStatus =
   ## Remove all keys from the current database
   r.sendCommand("FLUSHDB")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc info*(r: TRedis): TRedisString =
   ## Get information and statistics about the server
@@ -948,13 +947,13 @@ discard """
 proc monitor*(r: TRedis) =
   ## Listen for all requests received by the server in real time
   r.socket.send("MONITOR\c\L")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 """
 
 proc save*(r: TRedis) =
   ## Synchronously save the dataset to disk
   r.sendCommand("SAVE")
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 proc shutdown*(r: TRedis) =
   ## Synchronously save the dataset to disk and then shut down the server
@@ -966,7 +965,7 @@ proc shutdown*(r: TRedis) =
 proc slaveof*(r: TRedis, host: string, port: string) =
   ## Make the server a slave of another instance, or promote it as master
   r.sendCommand("SLAVEOF", host, port)
-  raiseNoOK(r.readStatus())
+  raiseNoOK(r.readStatus(), r.pipeline.enabled)
 
 iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] =
   ## Iterator for keys and values in a hash.
@@ -979,68 +978,73 @@ iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] =
     else:
       yield (k, i)
       k = ""
-      
-proc someTests(r: TRedis) =
-  #r.auth("pass")
+
+proc someTests(r: TRedis, how: string):seq[string] =
+  var list:seq[string] = @[]
+
+  case how
+  of "pipelined":
+    r.startPipelining()
+  of "multi": 
+    r.multi()
 
   r.setk("nim:test", "Testing something.")
   r.setk("nim:utf8", "こんにちは")
   r.setk("nim:esc", "\\ths ągt\\")
   r.setk("nim:int", "1")
-  echo(r.get("nim:esc"))
-  echo(r.incr("nim:int"))
-  echo r.get("nim:int")
-  echo r.get("nim:utf8")
-  echo r.hSet("test1", "name", "A Test")
+  list.add(r.get("nim:esc"))
+  list.add($(r.incr("nim:int")))
+  list.add(r.get("nim:int"))
+  list.add(r.get("nim:utf8"))
+  list.add($(r.hSet("test1", "name", "A Test")))
   var res = r.hGetAll("test1")
-  echo repr(r.get("blahasha"))
-  echo r.randomKey()
-  discard r.lpush("mylist","itema")
-  discard r.lpush("mylist","itemb")
+  for r in res:
+    list.add(r)
+  list.add(r.get("invalid_key"))
+  list.add($(r.lpush("mylist","itema")))
+  list.add($(r.lpush("mylist","itemb")))
   r.ltrim("mylist",0,1)
   var p = r.lrange("mylist", 0, -1)
- 
+
   for i in items(p):
     if not isNil(i):
-      echo("  ", i) 
-  
-  echo(r.debugObject("mylist"))
+      list.add(i) 
 
-  r.configSet("timeout", "299")
-  for i in items(r.configGet("timeout")): echo ">> ", i
+  list.add(r.debugObject("mylist"))
 
-  echo r.echoServ("BLAH")
-  
-  
-when false:
-  var r = open()
+  r.configSet("timeout", "299")
+  var g = r.configGet("timeout")
+  for i in items(g):
+    list.add(i)
+
+  list.add(r.echoServ("BLAH"))
+
+  case how
+  of "normal":
+    return list
+  of "pipelined":
+    return r.flushPipeline()
+  of "multi":
+    return r.exec()
+
+proc assertListsIdentical(listA, listB: seq[string]) =
+  assert(listA.len == listB.len)
+  var i = 0
+  for item in listA:
+    assert(item == listB[i])
+    i = i + 1
   
-  # Test with no pipelining
-  echo("----------------------------------------------")
-  echo("Testing without pipelining.")
-  r.someTests()
-
-  # Test with pipelining enabled
-  echo("//////////////////////////////////////////////")
-  echo()
-  echo("Testing with pipelining.")
-  echo()
-  r.setPipeline(true)
-  r.someTests()
-  var list = r.flushPipeline()
-  r.setPipeline(false)
-  echo("-- list length is " & $list.len & " --")
-  for item in list:
-    if not isNil(item):
-      echo item
- 
-  # Test with multi/exec() (automatic pipelining)
-  echo("************************************************")
-  echo("Testing with transaction (automatic pipelining)")
-  r.multi()
-  r.someTests()
-  list = r.exec()
-  echo("-- list length is " & $list.len & " --")
-  for item in list:
-    if not isNil(item):
-      echo item
+when isMainModule:
+  when false:
+    var r = open()
+
+    # Test with no pipelining
+    var listNormal = r.someTests("normal")
+
+    # Test with pipelining enabled
+    var listPipelined = r.someTests("pipelined")
+    assertListsIdentical(listNormal, listPipelined)
+
+    # Test with multi/exec() (automatic pipelining)
+    var listMulti = r.someTests("multi")
+    assertListsIdentical(listNormal, listMulti)