From 5c14b2cfba746c781ba19649efdbbe3d8f7309a6 Mon Sep 17 00:00:00 2001 From: Ben Burnett Date: Sun, 22 May 2016 20:07:10 -0600 Subject: [PATCH] Add expiring queue --- qr.py | 93 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/qr.py b/qr.py index 486c043..9a1662a 100644 --- a/qr.py +++ b/qr.py @@ -6,8 +6,10 @@ __version__ = '0.6.0' __license__ = 'MIT' +import time import redis import logging +import datetime try: import json @@ -369,3 +371,94 @@ def pop(self, block=False): queue, popped = self.redis.blpop(self.key) log.debug('Popped ** %s ** from key ** %s **' % (popped, self.key)) return self._unpack(popped) + +def _now(): + return time.mktime(datetime.datetime.now().timetuple()) + +class ExpiringPriorityQueue(PriorityQueue): + + def __init__(self, key, ttl, **kwargs): + PriorityQueue.__init__(self, key, **kwargs) + self.ttl = ttl + + def __len__(self): + """Return the length of the queue""" + with self.redis.pipeline() as pipe: + pipe.zremrangebyscore(self.key, 0, _now() - self.ttl) + pipe.zcard(self.key) + results = pipe.execute() + if results: + _, card = results + return card + return 0 + + def __getitem__(self, val): + """Get a slice or a particular index.""" + self.redis.zremrangebyscore(self.key, 0, _now() - self.ttl) + try: + return [self._unpack(i) for i in self.redis.zrange(self.key, val.start, val.stop - 1)] + except AttributeError: + value = self.redis.zrange(self.key, val, val) + if value: + return self._unpack(value[0]) + return None + except Exception as e: + log.error('Get item failed ** %s' % repr(e)) + return None + + def extend(self, vals, unique=False): + """Extends the elements in the queue.""" + with self.redis.pipeline(transaction=False) as pipe: + pipe.zremrangebyscore(self.key, 0, _now() - self.ttl) + for val, score in vals: + if unique: + pipe.zremrangebyscore(self.key, score, score) + pipe.zadd(self.key, self._pack(val), score) + return pipe.execute() + + def peek(self, withscores=False): + """Look at the next item in the queue""" + self.redis.zremrangebyscore(self.key, 0, _now() - self.ttl) + results = self.redis.zrange(self.key, 0, 0, withscores=True) + if results: + value, score = results[0] + value = self._unpack(value) + if withscores: + return (value, score) + return value + elif withscores: + return (None, 0.0) + return None + + def elements(self): + """Return all elements as a Python list""" + self.redis.zremrangebyscore(self.key, 0, _now() - self.ttl) + return [self._unpack(i) for i in self.redis.zrange(self.key, 0, -1)] + + def pop(self, withscores=False): + """Get the element with the lowest score, and pop it off""" + with self.redis.pipeline() as pipe: + pipe.zremrangebyscore(self.key, 0, _now() - self.ttl) + pipe.zrange(self.key, 0, 0, withscores=True) + pipe.zremrangebyrank(self.key, 0, 0) + _, results, _ = pipe.execute() + if results: + value, score = results[0] + value = self._unpack(value) + if withscores: + return (value, score) + return value + elif withscores: + return (None, 0.0) + return None + + def push(self, value, score=None, unique=False): + '''Add an element with a given score''' + score = _now() if not score else score + with self.redis.pipeline() as pipe: + pipe.zremrangebyscore(self.key, 0, _now() - self.ttl) + if unique: + pipe.zremrangebyscore(self.key, score, score) + pipe.zadd(self.key, self._pack(value), score) + results = pipe.execute() + return results