diff options
author | Jason Livesay <ithkuil@gmail.com> | 2014-04-17 03:47:44 -0700 |
---|---|---|
committer | Jason Livesay <ithkuil@gmail.com> | 2014-04-17 03:47:44 -0700 |
commit | 1068022dfbf170dc34bf39e45a44451851fcb06f (patch) | |
tree | 6ca193296be5078626e5b97bfb161d225491a675 /lib | |
parent | ebe174c8687e02404a986e17dffc773378dc98e7 (diff) | |
download | Nim-1068022dfbf170dc34bf39e45a44451851fcb06f.tar.gz |
Allow QUEUED reply only if pipelined; don't return status replies from flushPipeline; Rewrite someTests
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/redis.nim | 186 |
1 files changed, 95 insertions, 91 deletions
diff --git a/lib/pure/redis.nim b/lib/pure/redis.nim index 147a7a82a..c7fcc033f 100644 --- a/lib/pure/redis.nim +++ b/lib/pure/redis.nim @@ -58,8 +58,10 @@ proc raiseInvalidReply(expected, got: char) = "Expected '$1' at the beginning of a status reply got '$2'" % [$expected, $got]) -proc raiseNoOK(status: string) = - if status != "QUEUED" and status != "OK": +proc raiseNoOK(status: string, pipelineEnabled:bool) = + if pipelineEnabled and not (status == "QUEUED" or status == "PIPELINED"): + raise newException(EInvalidReply, "Expected \"QUEUED\" or \"PIPELINED\" got \"$1\"" % status) + elif not pipelineEnabled and status != "OK": raise newException(EInvalidReply, "Expected \"OK\" got \"$1\"" % status) template readSocket(r: TRedis, dummyVal:expr): stmt = @@ -71,7 +73,7 @@ template readSocket(r: TRedis, dummyVal:expr): stmt = proc parseStatus(r: TRedis, line: string = ""): TRedisStatus = if r.pipeline.enabled: - return "OK" + return "PIPELINED" if line == "": raise newException(ERedis, "Server closed connection prematurely") @@ -84,14 +86,14 @@ proc parseStatus(r: TRedis, line: string = ""): TRedisStatus = return line.substr(1) # Strip '+' proc readStatus(r:TRedis): TRedisStatus = - r.readSocket("OK") + r.readSocket("PIPELINED") return r.parseStatus(line) proc parseInteger(r: TRedis, line: string = ""): TRedisInteger = if r.pipeline.enabled: return -1 - if line == "+QUEUED": # inside of multi - return -1 + #if line == "+QUEUED": # inside of multi + # return -1 if line == "": raise newException(ERedis, "Server closed connection prematurely") @@ -193,7 +195,6 @@ proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList = r.socket.send(r.pipeline.buffer) r.pipeline.buffer = "" - var prevState = r.pipeline.enabled r.pipeline.enabled = false result = @[] @@ -201,23 +202,21 @@ proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList = for i in 0..tot-1: var ret = r.readNext() - if ret.len == 1 and (ret[0] == "OK" or ret[0] == "QUEUED"): - # Skip acknowledgement replies in multi - if not wasMulti: result.add(ret) - else: - result.add(ret) + for item in ret: + var isOK = item.contains("OK") + if not (item.contains("OK") or item.contains("QUEUED")): + result.add(item) r.pipeline.expected = 0 - r.pipeline.enabled = prevState -proc setPipeline*(r: TRedis, state: bool) = - ## Enable or disable command pipelining (reduces network roundtrips). +proc startPipelining*(r: TRedis) = + ## Enable command pipelining (reduces network roundtrips). ## Note that when enabled, you must call flushPipeline to actually send commands, except ## for multi/exec() which enable and flush the pipeline automatically. ## Commands return immediately with dummy values; actual results returned from ## flushPipeline() or exec() r.pipeline.expected = 0 - r.pipeline.enabled = state + r.pipeline.enabled = true proc sendCommand(r: TRedis, cmd: string, args: varargs[string]) = var request = "*" & $(1 + args.len()) & "\c\L" @@ -300,7 +299,7 @@ proc rename*(r: TRedis, key, newkey: string): TRedisStatus = ## ## **WARNING:** Overwrites `newkey` if it exists! r.sendCommand("RENAME", key, newkey) - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc renameNX*(r: TRedis, key, newkey: string): bool = ## Same as ``rename`` but doesn't continue if `newkey` exists. @@ -372,7 +371,7 @@ proc setk*(r: TRedis, key, value: string) = ## ## NOTE: This function had to be renamed due to a clash with the `set` type. r.sendCommand("SET", key, value) - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc setNX*(r: TRedis, key, value: string): bool = ## Set the value of a key, only if the key does not exist. Returns `true` @@ -389,7 +388,7 @@ proc setBit*(r: TRedis, key: string, offset: int, proc setEx*(r: TRedis, key: string, seconds: int, value: string): TRedisStatus = ## Set the value and expiration of a key r.sendCommand("SETEX", key, $seconds, value) - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc setRange*(r: TRedis, key: string, offset: int, value: string): TRedisInteger = @@ -452,7 +451,7 @@ proc hMSet*(r: TRedis, key: string, args.add(field) args.add(value) r.sendCommand("HMSET", args) - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc hSet*(r: TRedis, key, field, value: string): TRedisInteger = ## Set the string value of a hash field @@ -546,12 +545,12 @@ proc lRem*(r: TRedis, key: string, value: string, count: int = 0): TRedisInteger proc lSet*(r: TRedis, key: string, index: int, value: string) = ## Set the value of an element in a list by its index r.sendCommand("LSET", key, $index, value) - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) -proc lTrim*(r: TRedis, key: string, start, stop: int) = +proc lTrim*(r: TRedis, key: string, start, stop: int) = ## Trim a list to the specified range r.sendCommand("LTRIM", key, $start, $stop) - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc rPop*(r: TRedis, key: string): TRedisString = ## Remove and get the last element in a list @@ -829,7 +828,7 @@ proc unsubscribe*(r: TRedis, [channel: openarray[string], : string): ???? = proc discardMulti*(r: TRedis) = ## Discard all commands issued after MULTI r.sendCommand("DISCARD") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc exec*(r: TRedis): TRedisList = ## Execute all commands issued after MULTI @@ -842,26 +841,26 @@ proc exec*(r: TRedis): TRedisList = proc multi*(r: TRedis) = ## Mark the start of a transaction block - r.setPipeline(true) + r.startPipelining() r.sendCommand("MULTI") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc unwatch*(r: TRedis) = ## Forget about all watched keys r.sendCommand("UNWATCH") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc watch*(r: TRedis, key: varargs[string]) = ## Watch the given keys to determine execution of the MULTI/EXEC block r.sendCommand("WATCH", key) - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) # Connection proc auth*(r: TRedis, password: string) = ## Authenticate to the server r.sendCommand("AUTH", password) - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc echoServ*(r: TRedis, message: string): TRedisString = ## Echo the given string @@ -876,7 +875,7 @@ proc ping*(r: TRedis): TRedisStatus = proc quit*(r: TRedis) = ## Close the connection r.sendCommand("QUIT") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc select*(r: TRedis, index: int): TRedisStatus = ## Change the selected database for the current connection @@ -888,12 +887,12 @@ proc select*(r: TRedis, index: int): TRedisStatus = proc bgrewriteaof*(r: TRedis) = ## Asynchronously rewrite the append-only file r.sendCommand("BGREWRITEAOF") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc bgsave*(r: TRedis) = ## Asynchronously save the dataset to disk r.sendCommand("BGSAVE") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc configGet*(r: TRedis, parameter: string): TRedisList = ## Get the value of a configuration parameter @@ -903,12 +902,12 @@ proc configGet*(r: TRedis, parameter: string): TRedisList = proc configSet*(r: TRedis, parameter: string, value: string) = ## Set a configuration parameter to the given value r.sendCommand("CONFIG", "SET", parameter, value) - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc configResetStat*(r: TRedis) = ## Reset the stats returned by INFO r.sendCommand("CONFIG", "RESETSTAT") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc dbsize*(r: TRedis): TRedisInteger = ## Return the number of keys in the selected database @@ -927,12 +926,12 @@ proc debugSegfault*(r: TRedis) = proc flushall*(r: TRedis): TRedisStatus = ## Remove all keys from all databases r.sendCommand("FLUSHALL") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc flushdb*(r: TRedis): TRedisStatus = ## Remove all keys from the current database r.sendCommand("FLUSHDB") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc info*(r: TRedis): TRedisString = ## Get information and statistics about the server @@ -948,13 +947,13 @@ discard """ proc monitor*(r: TRedis) = ## Listen for all requests received by the server in real time r.socket.send("MONITOR\c\L") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) """ proc save*(r: TRedis) = ## Synchronously save the dataset to disk r.sendCommand("SAVE") - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) proc shutdown*(r: TRedis) = ## Synchronously save the dataset to disk and then shut down the server @@ -966,7 +965,7 @@ proc shutdown*(r: TRedis) = proc slaveof*(r: TRedis, host: string, port: string) = ## Make the server a slave of another instance, or promote it as master r.sendCommand("SLAVEOF", host, port) - raiseNoOK(r.readStatus()) + raiseNoOK(r.readStatus(), r.pipeline.enabled) iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] = ## Iterator for keys and values in a hash. @@ -979,68 +978,73 @@ iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] = else: yield (k, i) k = "" - -proc someTests(r: TRedis) = - #r.auth("pass") + +proc someTests(r: TRedis, how: string):seq[string] = + var list:seq[string] = @[] + + case how + of "pipelined": + r.startPipelining() + of "multi": + r.multi() r.setk("nim:test", "Testing something.") r.setk("nim:utf8", "こんにちは") r.setk("nim:esc", "\\ths ągt\\") r.setk("nim:int", "1") - echo(r.get("nim:esc")) - echo(r.incr("nim:int")) - echo r.get("nim:int") - echo r.get("nim:utf8") - echo r.hSet("test1", "name", "A Test") + list.add(r.get("nim:esc")) + list.add($(r.incr("nim:int"))) + list.add(r.get("nim:int")) + list.add(r.get("nim:utf8")) + list.add($(r.hSet("test1", "name", "A Test"))) var res = r.hGetAll("test1") - echo repr(r.get("blahasha")) - echo r.randomKey() - discard r.lpush("mylist","itema") - discard r.lpush("mylist","itemb") + for r in res: + list.add(r) + list.add(r.get("invalid_key")) + list.add($(r.lpush("mylist","itema"))) + list.add($(r.lpush("mylist","itemb"))) r.ltrim("mylist",0,1) var p = r.lrange("mylist", 0, -1) - + for i in items(p): if not isNil(i): - echo(" ", i) - - echo(r.debugObject("mylist")) + list.add(i) - r.configSet("timeout", "299") - for i in items(r.configGet("timeout")): echo ">> ", i + list.add(r.debugObject("mylist")) - echo r.echoServ("BLAH") - - -when false: - var r = open() + r.configSet("timeout", "299") + var g = r.configGet("timeout") + for i in items(g): + list.add(i) + + list.add(r.echoServ("BLAH")) + + case how + of "normal": + return list + of "pipelined": + return r.flushPipeline() + of "multi": + return r.exec() + +proc assertListsIdentical(listA, listB: seq[string]) = + assert(listA.len == listB.len) + var i = 0 + for item in listA: + assert(item == listB[i]) + i = i + 1 - # Test with no pipelining - echo("----------------------------------------------") - echo("Testing without pipelining.") - r.someTests() - - # Test with pipelining enabled - echo("//////////////////////////////////////////////") - echo() - echo("Testing with pipelining.") - echo() - r.setPipeline(true) - r.someTests() - var list = r.flushPipeline() - r.setPipeline(false) - echo("-- list length is " & $list.len & " --") - for item in list: - if not isNil(item): - echo item - - # Test with multi/exec() (automatic pipelining) - echo("************************************************") - echo("Testing with transaction (automatic pipelining)") - r.multi() - r.someTests() - list = r.exec() - echo("-- list length is " & $list.len & " --") - for item in list: - if not isNil(item): - echo item +when isMainModule: + when false: + var r = open() + + # Test with no pipelining + var listNormal = r.someTests("normal") + + # Test with pipelining enabled + var listPipelined = r.someTests("pipelined") + assertListsIdentical(listNormal, listPipelined) + + # Test with multi/exec() (automatic pipelining) + var listMulti = r.someTests("multi") + assertListsIdentical(listNormal, listMulti) |