diff options
-rw-r--r-- | lib/pure/redis.nim | 154 |
1 files changed, 77 insertions, 77 deletions
diff --git a/lib/pure/redis.nim b/lib/pure/redis.nim index e4d47e43d..51e8880dc 100644 --- a/lib/pure/redis.nim +++ b/lib/pure/redis.nim @@ -19,18 +19,18 @@ import sockets, os, strutils, parseutils const redisNil* = "\0\0" -type - TPipeline = object +type + PPipeline = ref object enabled: bool - buffer: ref string + buffer: string expected: int ## number of replies expected if pipelined type TRedis* {.pure, final.} = object socket: TSocket connected: bool - pipeline: ref TPipeline - + pipeline: PPipeline + TRedisStatus* = string TRedisInteger* = biggestInt TRedisString* = string ## Bulk reply @@ -39,10 +39,9 @@ type EInvalidReply* = object of ESynch ## Invalid reply from redis ERedis* = object of ESynch ## Error in redis -proc newPipeline(): ref TPipeLine = +proc newPipeline(): PPipeline = new(result) - result.buffer = new string - result.buffer[] = "" + result.buffer = "" result.enabled = false result.expected = 0 @@ -52,10 +51,10 @@ proc open*(host = "localhost", port = 6379.TPort): TRedis = if result.socket == InvalidSocket: OSError(OSLastError()) result.socket.connect(host, port) - result.pipeline = newPipeline() + result.pipeline = newPipeline() proc raiseInvalidReply(expected, got: char) = - raise newException(EInvalidReply, + raise newException(EInvalidReply, "Expected '$1' at the beginning of a status reply got '$2'" % [$expected, $got]) @@ -77,12 +76,12 @@ proc parseStatus(r: TRedis, lineIn: string = ""): TRedisStatus = raise newException(ERedis, strip(line)) if line[0] != '+': raiseInvalidReply('+', line[0]) - + return line.substr(1) # Strip '+' - + proc parseInteger(r: TRedis, lineIn: string = ""): TRedisInteger = if r.pipeline.enabled: return -1 - + var line = lineIn if line == "": r.socket.readLine(line) @@ -97,10 +96,10 @@ proc parseInteger(r: TRedis, lineIn: string = ""): TRedisInteger = raise newException(ERedis, strip(line)) if line[0] != ':': raiseInvalidReply(':', line[0]) - + # Strip ':' if parseBiggestInt(line, result, 1) == 0: - raise newException(EInvalidReply, "Unable to parse integer.") + raise newException(EInvalidReply, "Unable to parse integer.") proc recv(sock: TSocket, size: int): TaintedString = result = newString(size).TaintedString @@ -109,19 +108,19 @@ proc recv(sock: TSocket, size: int): TaintedString = proc parseSingleString(r: TRedis, line:string, allowMBNil = False): TRedisString = if r.pipeline.enabled: return "" - + # Error. if line[0] == '-': raise newException(ERedis, strip(line)) - + # Some commands return a /bulk/ value or a /multi-bulk/ nil. Odd. if allowMBNil: if line == "*-1": return RedisNil - + if line[0] != '$': raiseInvalidReply('$', line[0]) - + var numBytes = parseInt(line.substr(1)) if numBytes == -1: return RedisNil @@ -144,7 +143,7 @@ proc parseArrayLines(r: TRedis, countLine:string): TRedisList = if not isNil(parsed): for item in parsed: result.add(item) - + proc parseBulkString(r: TRedis, allowMBNil = False, lineIn:string = ""): TRedisString = if r.pipeline.enabled: return "" @@ -158,7 +157,7 @@ proc parseArray(r: TRedis): TRedisList = if r.pipeline.enabled: return @[] var line = TaintedString"" r.socket.readLine(line) - + return r.parseArrayLines(line) proc parseNext(r: TRedis): TRedisList = @@ -172,7 +171,7 @@ proc parseNext(r: TRedis): TRedisList = of ':': @[$(r.parseInteger(line))] of '$': @[r.parseBulkString(true,line)] of '*': r.parseArrayLines(line) - else: + else: raise newException(EInvalidReply, "parseNext failed on line: " & line) nil r.pipeline.expected -= 1 @@ -180,14 +179,14 @@ proc parseNext(r: TRedis): TRedisList = proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList = ## Send buffered commands, clear buffer, return results - if r.pipeline.buffer[].len > 0: - r.socket.send(r.pipeline.buffer[]) - r.pipeline.buffer[] = "" - + if r.pipeline.buffer.len > 0: + r.socket.send(r.pipeline.buffer) + r.pipeline.buffer = "" + var prevState = r.pipeline.enabled r.pipeline.enabled = false result = @[] - + var tot = r.pipeline.expected for i in 0..tot-1: @@ -195,13 +194,13 @@ proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList = if ret.len == 1 and (ret[0] == "OK" or ret[0] == "QUEUED"): # Skip acknowledgement replies in multi if not wasMulti: result.add(ret) - else: + else: result.add(ret) r.pipeline.expected = 0 r.pipeline.enabled = prevState -proc setPipeline*(r: TRedis, state: bool) = +proc sePPipeline*(r: TRedis, state: bool) = ## Enable or disable command pipelining (reduces network roundtrips). ## Note that when enabled, you must call flushPipeline to actually send commands, except ## for multi/exec() which enable and flush the pipeline automatically. @@ -217,9 +216,9 @@ proc sendCommand(r: TRedis, cmd: string, args: varargs[string]) = for i in items(args): request.add("$" & $i.len() & "\c\L") request.add(i & "\c\L") - + if r.pipeline.enabled: - r.pipeline.buffer[].add(request) + r.pipeline.buffer.add(request) r.pipeline.expected += 1 else: r.socket.send(request) @@ -234,10 +233,10 @@ proc sendCommand(r: TRedis, cmd: string, arg1: string, for i in items(args): request.add("$" & $i.len() & "\c\L") request.add(i & "\c\L") - + if r.pipeline.enabled: r.pipeline.expected += 1 - r.pipeline.buffer[].add(request) + r.pipeline.buffer.add(request) else: r.socket.send(request) @@ -280,7 +279,7 @@ proc persist*(r: TRedis, key: string): bool = ## Returns `true` when the timeout was removed. r.sendCommand("PERSIST", key) return r.parseInteger() == 1 - + proc randomKey*(r: TRedis): TRedisString = ## Return a random key from the keyspace r.sendCommand("RANDOMKEY") @@ -292,7 +291,7 @@ proc rename*(r: TRedis, key, newkey: string): TRedisStatus = ## **WARNING:** Overwrites `newkey` if it exists! r.sendCommand("RENAME", key, newkey) raiseNoOK(r.parseStatus()) - + proc renameNX*(r: TRedis, key, newkey: string): bool = ## Same as ``rename`` but doesn't continue if `newkey` exists. ## Returns `true` if key was renamed. @@ -303,12 +302,12 @@ proc ttl*(r: TRedis, key: string): TRedisInteger = ## Get the time to live for a key r.sendCommand("TTL", key) return r.parseInteger() - + proc keyType*(r: TRedis, key: string): TRedisStatus = ## Determine the type stored at key r.sendCommand("TYPE", key) return r.parseStatus() - + # Strings @@ -321,12 +320,12 @@ proc decr*(r: TRedis, key: string): TRedisInteger = ## Decrement the integer value of a key by one r.sendCommand("DECR", key) return r.parseInteger() - + proc decrBy*(r: TRedis, key: string, decrement: int): TRedisInteger = ## Decrement the integer value of a key by the given number r.sendCommand("DECRBY", key, $decrement) return r.parseInteger() - + proc get*(r: TRedis, key: string): TRedisString = ## Get the value of a key. Returns `redisNil` when `key` doesn't exist. r.sendCommand("GET", key) @@ -358,7 +357,7 @@ proc incrBy*(r: TRedis, key: string, increment: int): TRedisInteger = r.sendCommand("INCRBY", key, $increment) return r.parseInteger() -proc setk*(r: TRedis, key, value: string) = +proc setk*(r: TRedis, key, value: string) = ## Set the string value of a key. ## ## NOTE: This function had to be renamed due to a clash with the `set` type. @@ -371,18 +370,18 @@ proc setNX*(r: TRedis, key, value: string): bool = r.sendCommand("SETNX", key, value) return r.parseInteger() == 1 -proc setBit*(r: TRedis, key: string, offset: int, +proc setBit*(r: TRedis, key: string, offset: int, value: string): TRedisInteger = ## Sets or clears the bit at offset in the string value stored at key r.sendCommand("SETBIT", key, $offset, value) return r.parseInteger() - + proc setEx*(r: TRedis, key: string, seconds: int, value: string): TRedisStatus = ## Set the value and expiration of a key r.sendCommand("SETEX", key, $seconds, value) raiseNoOK(r.parseStatus()) -proc setRange*(r: TRedis, key: string, offset: int, +proc setRange*(r: TRedis, key: string, offset: int, value: string): TRedisInteger = ## Overwrite part of a string at key starting at the specified offset r.sendCommand("SETRANGE", key, $offset, value) @@ -435,7 +434,7 @@ proc hMGet*(r: TRedis, key: string, fields: varargs[string]): TRedisList = r.sendCommand("HMGET", key, fields) return r.parseArray() -proc hMSet*(r: TRedis, key: string, +proc hMSet*(r: TRedis, key: string, fieldValues: openarray[tuple[field, value: string]]) = ## Set multiple hash fields to multiple values var args = @[key] @@ -449,7 +448,7 @@ proc hSet*(r: TRedis, key, field, value: string): TRedisInteger = ## Set the string value of a hash field r.sendCommand("HSET", key, field, value) return r.parseInteger() - + proc hSetNX*(r: TRedis, key, field, value: string): TRedisInteger = ## Set the value of a hash field, only if the field does **not** exist r.sendCommand("HSETNX", key, field, value) @@ -459,7 +458,7 @@ proc hVals*(r: TRedis, key: string): TRedisList = ## Get all the values in a hash r.sendCommand("HVALS", key) return r.parseArray() - + # Lists proc bLPop*(r: TRedis, keys: varargs[string], timeout: int): TRedisList = @@ -500,7 +499,7 @@ proc lInsert*(r: TRedis, key: string, before: bool, pivot, value: string): var pos = if before: "BEFORE" else: "AFTER" r.sendCommand("LINSERT", key, pos, pivot, value) return r.parseInteger() - + proc lLen*(r: TRedis, key: string): TRedisInteger = ## Get the length of a list r.sendCommand("LLEN", key) @@ -548,12 +547,12 @@ proc rPop*(r: TRedis, key: string): TRedisString = ## Remove and get the last element in a list r.sendCommand("RPOP", key) return r.parseBulkString() - + proc rPopLPush*(r: TRedis, source, destination: string): TRedisString = ## Remove the last element in a list, append it to another list and return it r.sendCommand("RPOPLPUSH", source, destination) return r.parseBulkString() - + proc rPush*(r: TRedis, key, value: string, create: bool = True): TRedisInteger = ## Append a value to a list. Returns the length of the list after the push. ## The ``create`` param specifies whether a list should be created if it @@ -671,16 +670,16 @@ proc zinterstore*(r: TRedis, destination: string, numkeys: string, ## a new key var args = @[destination, numkeys] for i in items(keys): args.add(i) - + if weights.len != 0: args.add("WITHSCORE") for i in items(weights): args.add(i) if aggregate.len != 0: args.add("AGGREGATE") args.add(aggregate) - + r.sendCommand("ZINTERSTORE", args) - + return r.parseInteger() proc zrange*(r: TRedis, key: string, start: string, stop: string, @@ -692,18 +691,18 @@ proc zrange*(r: TRedis, key: string, start: string, stop: string, r.sendCommand("ZRANGE", "WITHSCORES", key, start, stop) return r.parseArray() -proc zrangebyscore*(r: TRedis, key: string, min: string, max: string, +proc zrangebyscore*(r: TRedis, key: string, min: string, max: string, withScore: bool = false, limit: bool = False, limitOffset: int = 0, limitCount: int = 0): TRedisList = ## Return a range of members in a sorted set, by score var args = @[key, min, max] - + if withScore: args.add("WITHSCORE") - if limit: + if limit: args.add("LIMIT") args.add($limitOffset) args.add($limitCount) - + r.sendCommand("ZRANGEBYSCORE", args) return r.parseArray() @@ -738,19 +737,19 @@ proc zrevrange*(r: TRedis, key: string, start: string, stop: string, else: r.sendCommand("ZREVRANGE", key, start, stop) return r.parseArray() -proc zrevrangebyscore*(r: TRedis, key: string, min: string, max: string, +proc zrevrangebyscore*(r: TRedis, key: string, min: string, max: string, withScore: bool = false, limit: bool = False, limitOffset: int = 0, limitCount: int = 0): TRedisList = ## Return a range of members in a sorted set, by score, with ## scores ordered from high to low var args = @[key, min, max] - + if withScore: args.add("WITHSCORE") - if limit: + if limit: args.add("LIMIT") args.add($limitOffset) args.add($limitCount) - + r.sendCommand("ZREVRANGEBYSCORE", args) return r.parseArray() @@ -771,16 +770,16 @@ proc zunionstore*(r: TRedis, destination: string, numkeys: string, ## Add multiple sorted sets and store the resulting sorted set in a new key var args = @[destination, numkeys] for i in items(keys): args.add(i) - + if weights.len != 0: args.add("WEIGHTS") for i in items(weights): args.add(i) if aggregate.len != 0: args.add("AGGREGATE") args.add(aggregate) - + r.sendCommand("ZUNIONSTORE", args) - + return r.parseInteger() @@ -809,7 +808,7 @@ proc subscribe*(r: TRedis, channel: openarray[string]): ???? = return ??? proc unsubscribe*(r: TRedis, [channel: openarray[string], : string): ???? = - ## Stop listening for messages posted to the given channels + ## Stop listening for messages posted to the given channels r.socket.send("UNSUBSCRIBE $# $#\c\L" % [[channel.join(), ]) return ??? @@ -824,15 +823,16 @@ proc discardMulti*(r: TRedis) = proc exec*(r: TRedis): TRedisList = ## Execute all commands issued after MULTI - r.sendCommand("EXEC") + r.sendCommand("EXEC") r.pipeline.enabled = false # Will reply with +OK for MULTI/EXEC and +QUEUED for every command # between, then with the results return r.flushPipeline(true) + proc multi*(r: TRedis) = ## Mark the start of a transaction block - r.setPipeline(true) + r.sePPipeline(true) r.sendCommand("MULTI") raiseNoOK(r.parseStatus()) @@ -960,7 +960,7 @@ proc slaveof*(r: TRedis, host: string, port: string) = iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] = ## Iterator for keys and values in a hash. - var + var contents = r.hGetAll(key) k = "" for i in items(contents): @@ -969,7 +969,7 @@ iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] = else: yield (k, i) k = "" - + proc someTests(r: TRedis) = #r.auth("pass") @@ -989,22 +989,22 @@ proc someTests(r: TRedis) = discard r.lpush("mylist","itemb") r.ltrim("mylist",0,1) var p = r.lrange("mylist", 0, -1) - + for i in items(p): if not isNil(i): - echo(" ", i) - + echo(" ", i) + echo(r.debugObject("mylist")) r.configSet("timeout", "299") for i in items(r.configGet("timeout")): echo ">> ", i echo r.echoServ("BLAH") - - + + when false: var r = open() - + # Test with no pipelining echo("----------------------------------------------") echo("Testing without pipelining.") @@ -1015,15 +1015,15 @@ when false: echo() echo("Testing with pipelining.") echo() - r.setPipeline(true) + r.sePPipeline(true) r.someTests() var list = r.flushPipeline() - r.setPipeline(false) + r.sePPipeline(false) echo("-- list length is " & $list.len & " --") for item in list: if not isNil(item): echo item - + # Test with multi/exec() (automatic pipelining) echo("************************************************") echo("Testing with transaction (automatic pipelining)") |