summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
authorJason Livesay <ithkuil@gmail.com>2014-04-09 20:43:45 -0700
committerJason Livesay <ithkuil@gmail.com>2014-04-09 20:43:45 -0700
commit5da463e1f730a284679369fa3049168f3bf3df48 (patch)
tree5d794143c255a01268a63923e3b3b56d0fdfe75c /lib
parent8b82004359b8d852fa0107d79cc78b21eb35c028 (diff)
downloadNim-5da463e1f730a284679369fa3049168f3bf3df48.tar.gz
Redis: optional pipelining and better tested transactions
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/redis.nim352
1 files changed, 233 insertions, 119 deletions
diff --git a/lib/pure/redis.nim b/lib/pure/redis.nim
index f4c45b99c..e4d47e43d 100644
--- a/lib/pure/redis.nim
+++ b/lib/pure/redis.nim
@@ -20,10 +20,17 @@ const
   redisNil* = "\0\0"
 
 type
+  TPipeline = object
+    enabled: bool
+    buffer: ref string
+    expected: int ## number of replies expected if pipelined
+
+type
   TRedis* {.pure, final.} = object
     socket: TSocket
     connected: bool
-  
+    pipeline: ref TPipeline
+
   TRedisStatus* = string
   TRedisInteger* = biggestInt
   TRedisString* = string ## Bulk reply
@@ -32,15 +39,23 @@ type
   EInvalidReply* = object of ESynch ## Invalid reply from redis
   ERedis* = object of ESynch        ## Error in redis
 
+proc newPipeline(): ref TPipeLine =
+  new(result)
+  result.buffer = new string
+  result.buffer[] = ""
+  result.enabled = false
+  result.expected = 0
+
 proc open*(host = "localhost", port = 6379.TPort): TRedis =
   ## Opens a connection to the redis server.
   result.socket = socket(buffered = false)
   if result.socket == InvalidSocket:
     OSError(OSLastError())
   result.socket.connect(host, port)
+  result.pipeline = newPipeline()
 
 proc raiseInvalidReply(expected, got: char) =
-  raise newException(EInvalidReply, 
+  raise newException(EInvalidReply,
           "Expected '$1' at the beginning of a status reply got '$2'" %
           [$expected, $got])
 
@@ -48,9 +63,13 @@ proc raiseNoOK(status: string) =
   if status != "QUEUED" and status != "OK":
     raise newException(EInvalidReply, "Expected \"OK\" got \"$1\"" % status)
 
-proc parseStatus(r: TRedis): TRedisStatus =
-  var line = ""
-  r.socket.readLine(line)
+proc parseStatus(r: TRedis, lineIn: string = ""): TRedisStatus =
+  if r.pipeline.enabled:
+    return "OK"
+
+  var line = lineIn
+  if line == "":
+    r.socket.readLine(line)
   if line == "":
     raise newException(ERedis, "Server closed connection prematurely")
 
@@ -58,12 +77,15 @@ proc parseStatus(r: TRedis): TRedisStatus =
     raise newException(ERedis, strip(line))
   if line[0] != '+':
     raiseInvalidReply('+', line[0])
-  
+
   return line.substr(1) # Strip '+'
-  
-proc parseInteger(r: TRedis): TRedisInteger =
-  var line = ""
-  r.socket.readLine(line)
+
+proc parseInteger(r: TRedis, lineIn: string = ""): TRedisInteger =
+  if r.pipeline.enabled: return -1
+
+  var line = lineIn
+  if line == "":
+    r.socket.readLine(line)
 
   if line == "+QUEUED":  # inside of multi
     return -1
@@ -75,32 +97,31 @@ proc parseInteger(r: TRedis): TRedisInteger =
     raise newException(ERedis, strip(line))
   if line[0] != ':':
     raiseInvalidReply(':', line[0])
-  
+
   # Strip ':'
   if parseBiggestInt(line, result, 1) == 0:
-    raise newException(EInvalidReply, "Unable to parse integer.") 
+    raise newException(EInvalidReply, "Unable to parse integer.")
 
 proc recv(sock: TSocket, size: int): TaintedString =
   result = newString(size).TaintedString
   if sock.recv(cstring(result), size) != size:
     raise newException(EInvalidReply, "recv failed")
 
-proc parseSingle(r: TRedis, line:string, allowMBNil = False): TRedisString =
+proc parseSingleString(r: TRedis, line:string, allowMBNil = False): TRedisString =
+  if r.pipeline.enabled: return ""
+
   # Error.
   if line[0] == '-':
     raise newException(ERedis, strip(line))
-  
+
   # Some commands return a /bulk/ value or a /multi-bulk/ nil. Odd.
   if allowMBNil:
     if line == "*-1":
        return RedisNil
-  
-  if line == "+QUEUED" or line == "+OK" : # inside of a transaction (multi)
-    return nil
 
   if line[0] != '$':
     raiseInvalidReply('$', line[0])
-  
+
   var numBytes = parseInt(line.substr(1))
   if numBytes == -1:
     return RedisNil
@@ -108,41 +129,86 @@ proc parseSingle(r: TRedis, line:string, allowMBNil = False): TRedisString =
   var s = r.socket.recv(numBytes+2)
   result = strip(s.string)
 
-proc parseMultiLines(r: TRedis, countLine:string): TRedisList =
+proc parseNext(r: TRedis): TRedisList
+
+proc parseArrayLines(r: TRedis, countLine:string): TRedisList =
   if countLine.string[0] != '*':
     raiseInvalidReply('*', countLine.string[0])
 
   var numElems = parseInt(countLine.string.substr(1))
   if numElems == -1: return nil
   result = @[]
+
   for i in 1..numElems:
-    var line = ""
-    r.socket.readLine(line.TaintedString)
-    if line[0] == '*':  # after exec() may contain more multi-bulk replies
-      var parsed = r.parseMultiLines(line)
+    var parsed = r.parseNext()
+    if not isNil(parsed):
       for item in parsed:
         result.add(item)
-    else:
-     result.add(r.parseSingle(line))
 
-proc parseBulk(r: TRedis, allowMBNil = False): TRedisString =
-  var line = ""
-  r.socket.readLine(line.TaintedString)
+proc parseBulkString(r: TRedis, allowMBNil = False, lineIn:string = ""): TRedisString =
+  if r.pipeline.enabled: return ""
+
+  var line = lineIn
+  if line == "":
+    r.socket.readLine(line.TaintedString)
 
-  if line == "+QUEUED" or line == "+OK": # inside of a transaction (multi)
-    return nil
+  return r.parseSingleString(line, allowMBNil)
+
+proc parseArray(r: TRedis): TRedisList =
+  if r.pipeline.enabled: return @[]
+  var line = TaintedString""
+  r.socket.readLine(line)
 
-  return r.parseSingle(line, allowMBNil)
+  return r.parseArrayLines(line)
 
-proc parseMultiBulk(r: TRedis): TRedisList =
+proc parseNext(r: TRedis): TRedisList =
+  if r.pipeline.enabled: return @[]
   var line = TaintedString""
   r.socket.readLine(line)
 
-  if line == "+QUEUED": # inside of a transaction (multi)
-    return nil
-    
-  return r.parseMultiLines(line)
+  var res = case line[0]
+    of '+': @[r.parseStatus(line)]
+    of '-': @[r.parseStatus(line)]
+    of ':': @[$(r.parseInteger(line))]
+    of '$': @[r.parseBulkString(true,line)]
+    of '*': r.parseArrayLines(line)
+    else:
+      raise newException(EInvalidReply, "parseNext failed on line: " & line)
+      nil
+  r.pipeline.expected -= 1
+  return res
+
+proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList =
+  ## Send buffered commands, clear buffer, return results
+  if r.pipeline.buffer[].len > 0:
+    r.socket.send(r.pipeline.buffer[])
+  r.pipeline.buffer[] = ""
+
+  var prevState = r.pipeline.enabled
+  r.pipeline.enabled = false
+  result = @[]
+
+  var tot = r.pipeline.expected
 
+  for i in 0..tot-1:
+    var ret = r.parseNext()
+    if ret.len == 1 and (ret[0] == "OK" or ret[0] == "QUEUED"):
+      # Skip acknowledgement replies in multi
+      if not wasMulti: result.add(ret)
+    else:
+      result.add(ret)
+
+  r.pipeline.expected = 0
+  r.pipeline.enabled = prevState
+
+proc setPipeline*(r: TRedis, state: bool) =
+  ## Enable or disable command pipelining (reduces network roundtrips).
+  ## Note that when enabled, you must call flushPipeline to actually send commands, except
+  ## for multi/exec() which enable and flush the pipeline automatically.
+  ## Commands return immediately with dummy values; actual results returned from
+  ## flushPipeline() or exec()
+  r.pipeline.expected = 0
+  r.pipeline.enabled = state
 
 proc sendCommand(r: TRedis, cmd: string, args: varargs[string]) =
   var request = "*" & $(1 + args.len()) & "\c\L"
@@ -151,7 +217,12 @@ proc sendCommand(r: TRedis, cmd: string, args: varargs[string]) =
   for i in items(args):
     request.add("$" & $i.len() & "\c\L")
     request.add(i & "\c\L")
-  r.socket.send(request)
+
+  if r.pipeline.enabled:
+    r.pipeline.buffer[].add(request)
+    r.pipeline.expected += 1
+  else:
+    r.socket.send(request)
 
 proc sendCommand(r: TRedis, cmd: string, arg1: string,
                  args: varargs[string]) =
@@ -163,7 +234,12 @@ proc sendCommand(r: TRedis, cmd: string, arg1: string,
   for i in items(args):
     request.add("$" & $i.len() & "\c\L")
     request.add(i & "\c\L")
-  r.socket.send(request)
+
+  if r.pipeline.enabled:
+    r.pipeline.expected += 1
+    r.pipeline.buffer[].add(request)
+  else:
+    r.socket.send(request)
 
 # Keys
 
@@ -192,7 +268,7 @@ proc expireAt*(r: TRedis, key: string, timestamp: int): bool =
 proc keys*(r: TRedis, pattern: string): TRedisList =
   ## Find all keys matching the given pattern
   r.sendCommand("KEYS", pattern)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc move*(r: TRedis, key: string, db: int): bool =
   ## Move a key to another database. Returns `true` on a successful move.
@@ -204,11 +280,11 @@ proc persist*(r: TRedis, key: string): bool =
   ## Returns `true` when the timeout was removed.
   r.sendCommand("PERSIST", key)
   return r.parseInteger() == 1
-  
+
 proc randomKey*(r: TRedis): TRedisString =
   ## Return a random key from the keyspace
   r.sendCommand("RANDOMKEY")
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc rename*(r: TRedis, key, newkey: string): TRedisStatus =
   ## Rename a key.
@@ -216,7 +292,7 @@ proc rename*(r: TRedis, key, newkey: string): TRedisStatus =
   ## **WARNING:** Overwrites `newkey` if it exists!
   r.sendCommand("RENAME", key, newkey)
   raiseNoOK(r.parseStatus())
-  
+
 proc renameNX*(r: TRedis, key, newkey: string): bool =
   ## Same as ``rename`` but doesn't continue if `newkey` exists.
   ## Returns `true` if key was renamed.
@@ -227,12 +303,12 @@ proc ttl*(r: TRedis, key: string): TRedisInteger =
   ## Get the time to live for a key
   r.sendCommand("TTL", key)
   return r.parseInteger()
-  
+
 proc keyType*(r: TRedis, key: string): TRedisStatus =
   ## Determine the type stored at key
   r.sendCommand("TYPE", key)
   return r.parseStatus()
-  
+
 
 # Strings
 
@@ -245,16 +321,16 @@ proc decr*(r: TRedis, key: string): TRedisInteger =
   ## Decrement the integer value of a key by one
   r.sendCommand("DECR", key)
   return r.parseInteger()
-  
+
 proc decrBy*(r: TRedis, key: string, decrement: int): TRedisInteger =
   ## Decrement the integer value of a key by the given number
   r.sendCommand("DECRBY", key, $decrement)
   return r.parseInteger()
-  
+
 proc get*(r: TRedis, key: string): TRedisString =
   ## Get the value of a key. Returns `redisNil` when `key` doesn't exist.
   r.sendCommand("GET", key)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc getBit*(r: TRedis, key: string, offset: int): TRedisInteger =
   ## Returns the bit value at offset in the string value stored at key
@@ -264,13 +340,13 @@ proc getBit*(r: TRedis, key: string, offset: int): TRedisInteger =
 proc getRange*(r: TRedis, key: string, start, stop: int): TRedisString =
   ## Get a substring of the string stored at a key
   r.sendCommand("GETRANGE", key, $start, $stop)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc getSet*(r: TRedis, key: string, value: string): TRedisString =
   ## Set the string value of a key and return its old value. Returns `redisNil`
   ## when key doesn't exist.
   r.sendCommand("GETSET", key, value)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc incr*(r: TRedis, key: string): TRedisInteger =
   ## Increment the integer value of a key by one.
@@ -282,7 +358,7 @@ proc incrBy*(r: TRedis, key: string, increment: int): TRedisInteger =
   r.sendCommand("INCRBY", key, $increment)
   return r.parseInteger()
 
-proc setk*(r: TRedis, key, value: string) = 
+proc setk*(r: TRedis, key, value: string) =
   ## Set the string value of a key.
   ##
   ## NOTE: This function had to be renamed due to a clash with the `set` type.
@@ -295,18 +371,18 @@ proc setNX*(r: TRedis, key, value: string): bool =
   r.sendCommand("SETNX", key, value)
   return r.parseInteger() == 1
 
-proc setBit*(r: TRedis, key: string, offset: int, 
+proc setBit*(r: TRedis, key: string, offset: int,
              value: string): TRedisInteger =
   ## Sets or clears the bit at offset in the string value stored at key
   r.sendCommand("SETBIT", key, $offset, value)
   return r.parseInteger()
-  
+
 proc setEx*(r: TRedis, key: string, seconds: int, value: string): TRedisStatus =
   ## Set the value and expiration of a key
   r.sendCommand("SETEX", key, $seconds, value)
   raiseNoOK(r.parseStatus())
 
-proc setRange*(r: TRedis, key: string, offset: int, 
+proc setRange*(r: TRedis, key: string, offset: int,
                value: string): TRedisInteger =
   ## Overwrite part of a string at key starting at the specified offset
   r.sendCommand("SETRANGE", key, $offset, value)
@@ -332,12 +408,12 @@ proc hExists*(r: TRedis, key, field: string): bool =
 proc hGet*(r: TRedis, key, field: string): TRedisString =
   ## Get the value of a hash field
   r.sendCommand("HGET", key, field)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc hGetAll*(r: TRedis, key: string): TRedisList =
   ## Get all the fields and values in a hash
   r.sendCommand("HGETALL", key)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc hIncrBy*(r: TRedis, key, field: string, incr: int): TRedisInteger =
   ## Increment the integer value of a hash field by the given number
@@ -347,7 +423,7 @@ proc hIncrBy*(r: TRedis, key, field: string, incr: int): TRedisInteger =
 proc hKeys*(r: TRedis, key: string): TRedisList =
   ## Get all the fields in a hash
   r.sendCommand("HKEYS", key)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc hLen*(r: TRedis, key: string): TRedisInteger =
   ## Get the number of fields in a hash
@@ -357,9 +433,9 @@ proc hLen*(r: TRedis, key: string): TRedisInteger =
 proc hMGet*(r: TRedis, key: string, fields: varargs[string]): TRedisList =
   ## Get the values of all the given hash fields
   r.sendCommand("HMGET", key, fields)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
-proc hMSet*(r: TRedis, key: string, 
+proc hMSet*(r: TRedis, key: string,
             fieldValues: openarray[tuple[field, value: string]]) =
   ## Set multiple hash fields to multiple values
   var args = @[key]
@@ -373,7 +449,7 @@ proc hSet*(r: TRedis, key, field, value: string): TRedisInteger =
   ## Set the string value of a hash field
   r.sendCommand("HSET", key, field, value)
   return r.parseInteger()
-  
+
 proc hSetNX*(r: TRedis, key, field, value: string): TRedisInteger =
   ## Set the value of a hash field, only if the field does **not** exist
   r.sendCommand("HSETNX", key, field, value)
@@ -382,8 +458,8 @@ proc hSetNX*(r: TRedis, key, field, value: string): TRedisInteger =
 proc hVals*(r: TRedis, key: string): TRedisList =
   ## Get all the values in a hash
   r.sendCommand("HVALS", key)
-  return r.parseMultiBulk()
-  
+  return r.parseArray()
+
 # Lists
 
 proc bLPop*(r: TRedis, keys: varargs[string], timeout: int): TRedisList =
@@ -393,7 +469,7 @@ proc bLPop*(r: TRedis, keys: varargs[string], timeout: int): TRedisList =
   for i in items(keys): args.add(i)
   args.add($timeout)
   r.sendCommand("BLPOP", args)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc bRPop*(r: TRedis, keys: varargs[string], timeout: int): TRedisList =
   ## Remove and get the *last* element in a list, or block until one 
@@ -402,7 +478,7 @@ proc bRPop*(r: TRedis, keys: varargs[string], timeout: int): TRedisList =
   for i in items(keys): args.add(i)
   args.add($timeout)
   r.sendCommand("BRPOP", args)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc bRPopLPush*(r: TRedis, source, destination: string,
                  timeout: int): TRedisString =
@@ -411,12 +487,12 @@ proc bRPopLPush*(r: TRedis, source, destination: string,
   ##
   ## http://redis.io/commands/brpoplpush
   r.sendCommand("BRPOPLPUSH", source, destination, $timeout)
-  return r.parseBulk(true) # Multi-Bulk nil allowed.
+  return r.parseBulkString(true) # Multi-Bulk nil allowed.
 
 proc lIndex*(r: TRedis, key: string, index: int): TRedisString =
   ## Get an element from a list by its index
   r.sendCommand("LINDEX", key, $index)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc lInsert*(r: TRedis, key: string, before: bool, pivot, value: string):
               TRedisInteger =
@@ -424,7 +500,7 @@ proc lInsert*(r: TRedis, key: string, before: bool, pivot, value: string):
   var pos = if before: "BEFORE" else: "AFTER"
   r.sendCommand("LINSERT", key, pos, pivot, value)
   return r.parseInteger()
-  
+
 proc lLen*(r: TRedis, key: string): TRedisInteger =
   ## Get the length of a list
   r.sendCommand("LLEN", key)
@@ -433,7 +509,7 @@ proc lLen*(r: TRedis, key: string): TRedisInteger =
 proc lPop*(r: TRedis, key: string): TRedisString =
   ## Remove and get the first element in a list
   r.sendCommand("LPOP", key)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc lPush*(r: TRedis, key, value: string, create: bool = True): TRedisInteger =
   ## Prepend a value to a list. Returns the length of the list after the push.
@@ -450,7 +526,7 @@ proc lRange*(r: TRedis, key: string, start, stop: int): TRedisList =
   ## Get a range of elements from a list. Returns `nil` when `key` 
   ## doesn't exist.
   r.sendCommand("LRANGE", key, $start, $stop)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc lRem*(r: TRedis, key: string, value: string, count: int = 0): TRedisInteger =
   ## Remove elements from a list. Returns the number of elements that have been
@@ -471,13 +547,13 @@ proc lTrim*(r: TRedis, key: string, start, stop: int) =
 proc rPop*(r: TRedis, key: string): TRedisString =
   ## Remove and get the last element in a list
   r.sendCommand("RPOP", key)
-  return r.parseBulk()
-  
+  return r.parseBulkString()
+
 proc rPopLPush*(r: TRedis, source, destination: string): TRedisString =
   ## Remove the last element in a list, append it to another list and return it
   r.sendCommand("RPOPLPUSH", source, destination)
-  return r.parseBulk()
-  
+  return r.parseBulkString()
+
 proc rPush*(r: TRedis, key, value: string, create: bool = True): TRedisInteger =
   ## Append a value to a list. Returns the length of the list after the push.
   ## The ``create`` param specifies whether a list should be created if it
@@ -504,7 +580,7 @@ proc scard*(r: TRedis, key: string): TRedisInteger =
 proc sdiff*(r: TRedis, keys: varargs[string]): TRedisList =
   ## Subtract multiple sets
   r.sendCommand("SDIFF", keys)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc sdiffstore*(r: TRedis, destination: string,
                 keys: varargs[string]): TRedisInteger =
@@ -515,7 +591,7 @@ proc sdiffstore*(r: TRedis, destination: string,
 proc sinter*(r: TRedis, keys: varargs[string]): TRedisList =
   ## Intersect multiple sets
   r.sendCommand("SINTER", keys)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc sinterstore*(r: TRedis, destination: string,
                  keys: varargs[string]): TRedisInteger =
@@ -531,7 +607,7 @@ proc sismember*(r: TRedis, key: string, member: string): TRedisInteger =
 proc smembers*(r: TRedis, key: string): TRedisList =
   ## Get all the members in a set
   r.sendCommand("SMEMBERS", key)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc smove*(r: TRedis, source: string, destination: string,
            member: string): TRedisInteger =
@@ -542,12 +618,12 @@ proc smove*(r: TRedis, source: string, destination: string,
 proc spop*(r: TRedis, key: string): TRedisString =
   ## Remove and return a random member from a set
   r.sendCommand("SPOP", key)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc srandmember*(r: TRedis, key: string): TRedisString =
   ## Get a random member from a set
   r.sendCommand("SRANDMEMBER", key)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc srem*(r: TRedis, key: string, member: string): TRedisInteger =
   ## Remove a member from a set
@@ -557,7 +633,7 @@ proc srem*(r: TRedis, key: string, member: string): TRedisInteger =
 proc sunion*(r: TRedis, keys: varargs[string]): TRedisList =
   ## Add multiple sets
   r.sendCommand("SUNION", keys)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc sunionstore*(r: TRedis, destination: string,
                  key: varargs[string]): TRedisInteger =
@@ -586,7 +662,7 @@ proc zincrby*(r: TRedis, key: string, increment: string,
              member: string): TRedisString =
   ## Increment the score of a member in a sorted set
   r.sendCommand("ZINCRBY", key, increment, member)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc zinterstore*(r: TRedis, destination: string, numkeys: string,
                  keys: openarray[string], weights: openarray[string] = [],
@@ -595,16 +671,16 @@ proc zinterstore*(r: TRedis, destination: string, numkeys: string,
   ## a new key
   var args = @[destination, numkeys]
   for i in items(keys): args.add(i)
-  
+
   if weights.len != 0:
     args.add("WITHSCORE")
     for i in items(weights): args.add(i)
   if aggregate.len != 0:
     args.add("AGGREGATE")
     args.add(aggregate)
-    
+
   r.sendCommand("ZINTERSTORE", args)
-  
+
   return r.parseInteger()
 
 proc zrange*(r: TRedis, key: string, start: string, stop: string,
@@ -614,27 +690,27 @@ proc zrange*(r: TRedis, key: string, start: string, stop: string,
     r.sendCommand("ZRANGE", key, start, stop)
   else:
     r.sendCommand("ZRANGE", "WITHSCORES", key, start, stop)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
-proc zrangebyscore*(r: TRedis, key: string, min: string, max: string, 
+proc zrangebyscore*(r: TRedis, key: string, min: string, max: string,
                    withScore: bool = false, limit: bool = False,
                    limitOffset: int = 0, limitCount: int = 0): TRedisList =
   ## Return a range of members in a sorted set, by score
   var args = @[key, min, max]
-  
+
   if withScore: args.add("WITHSCORE")
-  if limit: 
+  if limit:
     args.add("LIMIT")
     args.add($limitOffset)
     args.add($limitCount)
-    
+
   r.sendCommand("ZRANGEBYSCORE", args)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc zrank*(r: TRedis, key: string, member: string): TRedisString =
   ## Determine the index of a member in a sorted set
   r.sendCommand("ZRANK", key, member)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc zrem*(r: TRedis, key: string, member: string): TRedisInteger =
   ## Remove a member from a sorted set
@@ -660,34 +736,34 @@ proc zrevrange*(r: TRedis, key: string, start: string, stop: string,
   if withScore:
     r.sendCommand("ZREVRANGE", "WITHSCORE", key, start, stop)
   else: r.sendCommand("ZREVRANGE", key, start, stop)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
-proc zrevrangebyscore*(r: TRedis, key: string, min: string, max: string, 
+proc zrevrangebyscore*(r: TRedis, key: string, min: string, max: string,
                    withScore: bool = false, limit: bool = False,
                    limitOffset: int = 0, limitCount: int = 0): TRedisList =
   ## Return a range of members in a sorted set, by score, with
   ## scores ordered from high to low
   var args = @[key, min, max]
-  
+
   if withScore: args.add("WITHSCORE")
-  if limit: 
+  if limit:
     args.add("LIMIT")
     args.add($limitOffset)
     args.add($limitCount)
-  
+
   r.sendCommand("ZREVRANGEBYSCORE", args)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc zrevrank*(r: TRedis, key: string, member: string): TRedisString =
   ## Determine the index of a member in a sorted set, with
   ## scores ordered from high to low
   r.sendCommand("ZREVRANK", key, member)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc zscore*(r: TRedis, key: string, member: string): TRedisString =
   ## Get the score associated with the given member in a sorted set
   r.sendCommand("ZSCORE", key, member)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc zunionstore*(r: TRedis, destination: string, numkeys: string,
                  keys: openarray[string], weights: openarray[string] = [],
@@ -695,16 +771,16 @@ proc zunionstore*(r: TRedis, destination: string, numkeys: string,
   ## Add multiple sorted sets and store the resulting sorted set in a new key 
   var args = @[destination, numkeys]
   for i in items(keys): args.add(i)
-  
+
   if weights.len != 0:
     args.add("WEIGHTS")
     for i in items(weights): args.add(i)
   if aggregate.len != 0:
     args.add("AGGREGATE")
     args.add(aggregate)
-    
+
   r.sendCommand("ZUNIONSTORE", args)
-  
+
   return r.parseInteger()
 
 
@@ -733,7 +809,7 @@ proc subscribe*(r: TRedis, channel: openarray[string]): ???? =
   return ???
 
 proc unsubscribe*(r: TRedis, [channel: openarray[string], : string): ???? =
-  ## Stop listening for messages posted to the given channels 
+  ## Stop listening for messages posted to the given channels
   r.socket.send("UNSUBSCRIBE $# $#\c\L" % [[channel.join(), ])
   return ???
 
@@ -749,11 +825,14 @@ proc discardMulti*(r: TRedis) =
 proc exec*(r: TRedis): TRedisList =
   ## Execute all commands issued after MULTI
   r.sendCommand("EXEC")
-
-  return r.parseMultiBulk()
+  r.pipeline.enabled = false
+  # Will reply with +OK for MULTI/EXEC and +QUEUED for every command
+  # between, then with the results
+  return r.flushPipeline(true)
 
 proc multi*(r: TRedis) =
   ## Mark the start of a transaction block
+  r.setPipeline(true)
   r.sendCommand("MULTI")
   raiseNoOK(r.parseStatus())
 
@@ -777,7 +856,7 @@ proc auth*(r: TRedis, password: string) =
 proc echoServ*(r: TRedis, message: string): TRedisString =
   ## Echo the given string
   r.sendCommand("ECHO", message)
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc ping*(r: TRedis): TRedisStatus =
   ## Ping the server
@@ -809,7 +888,7 @@ proc bgsave*(r: TRedis) =
 proc configGet*(r: TRedis, parameter: string): TRedisList =
   ## Get the value of a configuration parameter
   r.sendCommand("CONFIG", "GET", parameter)
-  return r.parseMultiBulk()
+  return r.parseArray()
 
 proc configSet*(r: TRedis, parameter: string, value: string) =
   ## Set a configuration parameter to the given value
@@ -848,7 +927,7 @@ proc flushdb*(r: TRedis): TRedisStatus =
 proc info*(r: TRedis): TRedisString =
   ## Get information and statistics about the server
   r.sendCommand("INFO")
-  return r.parseBulk()
+  return r.parseBulkString()
 
 proc lastsave*(r: TRedis): TRedisInteger =
   ## Get the UNIX time stamp of the last successful save to disk
@@ -881,7 +960,7 @@ proc slaveof*(r: TRedis, host: string, port: string) =
 
 iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] =
   ## Iterator for keys and values in a hash.
-  var 
+  var
     contents = r.hGetAll(key)
     k = ""
   for i in items(contents):
@@ -890,33 +969,68 @@ iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] =
     else:
       yield (k, i)
       k = ""
-      
 
-when false:
-  # sorry, deactivated for the test suite
-  var r = open()
-  r.auth("pass")
+proc someTests(r: TRedis) =
+  #r.auth("pass")
 
   r.setk("nim:test", "Testing something.")
   r.setk("nim:utf8", "こんにちは")
   r.setk("nim:esc", "\\ths ągt\\")
-  
-  echo r.get("nim:esc")
-  echo r.incr("nim:int")
-  echo r.incr("nim:int")
+  r.setk("nim:int", "1")
+  echo(r.get("nim:esc"))
+  echo(r.incr("nim:int"))
   echo r.get("nim:int")
   echo r.get("nim:utf8")
+  echo r.hSet("test1", "name", "A Test")
+  var res = r.hGetAll("test1")
   echo repr(r.get("blahasha"))
   echo r.randomKey()
-  
+  discard r.lpush("mylist","itema")
+  discard r.lpush("mylist","itemb")
+  r.ltrim("mylist",0,1)
   var p = r.lrange("mylist", 0, -1)
+
   for i in items(p):
-    echo("  ", i)
+    if not isNil(i):
+      echo("  ", i)
 
-  echo(r.debugObject("test"))
+  echo(r.debugObject("mylist"))
 
   r.configSet("timeout", "299")
   for i in items(r.configGet("timeout")): echo ">> ", i
 
   echo r.echoServ("BLAH")
 
+
+when false:
+  var r = open()
+
+  # Test with no pipelining
+  echo("----------------------------------------------")
+  echo("Testing without pipelining.")
+  r.someTests()
+
+  # Test with pipelining enabled
+  echo("//////////////////////////////////////////////")
+  echo()
+  echo("Testing with pipelining.")
+  echo()
+  r.setPipeline(true)
+  r.someTests()
+  var list = r.flushPipeline()
+  r.setPipeline(false)
+  echo("-- list length is " & $list.len & " --")
+  for item in list:
+    if not isNil(item):
+      echo item
+
+  # Test with multi/exec() (automatic pipelining)
+  echo("************************************************")
+  echo("Testing with transaction (automatic pipelining)")
+  r.multi()
+  r.someTests()
+  list = r.exec()
+  echo("-- list length is " & $list.len & " --")
+  for item in list:
+    if not isNil(item):
+      echo item