Skip to content

Distributed synchronization primitives buit on top of Redis

License

Notifications You must be signed in to change notification settings

asynq-io/redguard

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Tests Build License Python Format PyPi Mypy Ruff security: bandit

redguard

Distributed synchronization primitives buit on top of Redis

Installation

pip install redguard

Available primitives

  • Lock - distributed mutex
  • Semaphore - distributed semaphore
  • RateLimiter - distributed rate limiter

Helpers

  • RedGuard - factory for creating primitives
  • SharedResourcePool - factory for creating shared resources

Usage

The api is similar to built-in asyncio module primitives. Each primitive (except for RateLimiter) has ttl parameter which defines how long the lock is held in case of unexpected failure.

from redguard import RedGuard
from redguard.lock import Lock

guard = RedGuard.from_url("redis://localhost:6379", namespace="examples")

async def lock_example():
    lock = guard.new(Lock, "my-lock", ttl=10)

    async with lock:
        print("Locked")

async def semaphore_example():
    semaphore = guard.new(Semaphore, "my-semaphore", capacity=2, ttl=10)

    async with semaphore:
        print("Acquired")

async def rate_limiter_example():
    rate_limiter = guard.new(RateLimiter, "my-rate-limiter", limit=2, window=1)

    async with rate_limiter:
        print("Rate limited")

async def object_pool_example():
    pool = guard.pool(Semaphore, "my-pool", factory=dict, capacity=3, ttl=10)
    # this will create new dictionary limited to 3 instances globally
    async with pool as resource:
        resource["key"] = "value"
        print(resource)

Lower level api

Each primitve can be used as async context manager, but also provides acquire and release methods.

semaphore = guard.new(Semaphore, "my-semaphore", capacity=2, ttl=10)

acquired = await semaphore.acquire(blocking=True, timeout=None) # returns True if acquired (useful for blocking=False)

if acquired:
    await semaphore.release()