Topics


Pages


Content

This cheatsheet provides a quick reference for beginners on how to use Redis with Python. It covers installation, basic operations, and common scenarios you might encounter when working with Redis in your Python projects.

Using Redis through a Python application means seamless access to rich data structures for asynchronous programming while managing large datasets efficiently.

  • Redis cheatsheet in the NoSQL section, including installing and running Redis.
  • Python Redis guide on the RealPython website.

Resources

Use cases

  • Integrate Redis with Flask for enhanced sesion management and caching.
  • Use Redis as a cache backend or session store in Django to streamline data access and improve app load times.
  • For async web frameworks like FastAPI or Tornado, use Redis for tasks like backgorund job processing and message queing for responsive apps that can handle concurent requests gracefully.

Install the Python Redis client

$ pip install redis

Basic Redis operations in Python

Connect to Redis

import redis


# Connect to default localhost:6379
r = redis.Redis()

# Or specify host and port
r = redis.Redis(host='localhost', port=6379, db=0)

Basic operations

# Set a key.
r.set('mykey', 'Hello, Redis!')
# Get a key.
value = r.get('mykey')
# b'Hello, Redis!'
# Delete a key.
r.delete('mykey')
# Check if a key exists.
exists = r.exists('mykey')
# Set expiration in seconds.
r.setex('mykey', 10, 'This will expire in 10 seconds')

Batching with the pipeline method

How you can insert a list of 3 items (dictionaries) with random IDs into a Redis database using a batch request in Python. Using r.pipeline(), we can queue multiple commands and execute them as a batch, which is more efficient than executing them individually.

with r.pipeline() as pipe:
    pipe.set('key1', 'value1')
    pipe.rpush('list1', 'item1', 'item2')
    pipe.sadd('set1', 'member1', 'member2')

    pipe.execute()

A more realistic example. When you run this code, it will insert the 3 items into Redis with random IDs and print the inserted items with their respective IDs.

import redis
import uuid

r = redis.Redis(host='localhost', port=6379, db=0)

items = [
    {'name': 'Item 1', 'price': 10.99, 'category': 'Electronics'},
    {'name': 'Item 2', 'price': 5.75, 'category': 'Books'},
    {'name': 'Item 3', 'price': 22.50, 'category': 'Clothing'}
]

# Use a pipeline to execute a batch of commands
pipe = r.pipeline()

# Insert each item with a random ID
for item in items:
    item_id = str(uuid.uuid4())
    pipe.hmset(f'item:{item_id}', item)

# Execute the batch of commands
pipe.execute()

# Print the inserted items
for item in items:
    item_id = str(uuid.uuid4())
    inserted_item = r.hgetall(f'item:{item_id}')
    print(f'Item ID: {item_id}, Item: {inserted_item}')

Code for common use cases

Queue

import time


def enqueue_task(task_id: str, task_data: dict) -> None:
    r.lpush('task_queue', task_id)
    r.hmset(f'task:{task_id}', task_data)


def process_queue():
    while True:
        # Wait for available task
        task = r.brpop('task_queue', timeout=0)

        if task:
            task_id = task[1].decode('utf-8')
            task_data = r.hgetall(f'task:{task_id}')
            
            # Process the task (simulated)
            print(f"Processing task {task_id}: {task_data}")
            time.sleep(1)  # Simulate processing time
            
            # Clean up
            r.delete(f'task:{task_id}')
        else:
            print("No tasks available. Waiting...")


# Usage
enqueue_task('task1', {'type': 'email', 'recipient': 'user@example.com'})
enqueue_task('task2', {'type': 'report', 'format': 'pdf'})

# In a separate process or thread
process_queue()

Logged-in users

import time

def log_user_in(user_id: str, session_id: str):
    # Store session info
    r.hset(f'session:{session_id}', 'user_id', user_id)
    r.hset(f'session:{session_id}', 'login_time', int(time.time()))
    
    # Add to set of logged-in users
    r.sadd('logged_in_users', user_id)

def log_user_out(session_id: str):
    user_id = r.hget(f'session:{session_id}', 'user_id')
    if user_id:
        r.srem('logged_in_users', user_id)
        r.delete(f'session:{session_id}')

def get_logged_in_users():
    return r.smembers('logged_in_users')

def is_user_logged_in(user_id: str):
    return r.sismember('logged_in_users', user_id)

# Usage
log_user_in('user123', 'session_abc123')
log_user_in('user456', 'session_def456')

print(get_logged_in_users())  # Set of logged-in user IDs
print(is_user_logged_in('user123'))  # True

log_user_out('session_abc123')
print(is_user_logged_in('user123'))  # False

Counter

def increment_page_views(page_url: str):
    return r.incr(f'pageviews:{page_url}')


def get_page_views(page_url: str):
    views = r.get(f'pageviews:{page_url}')
    return int(views) if views else 0


def increment_daily_active_users(date: str):
    return r.pfadd(f'daily_active_users:{date}', *get_logged_in_users())


def get_daily_active_users(date: str):
    return r.pfcount(f'daily_active_users:{date}')


# Usage
print(increment_page_views('/home'))  # 1
print(increment_page_views('/home'))  # 2
print(get_page_views('/home'))  # 2

# Assuming some users are logged in
increment_daily_active_users('2024-08-07')
print(get_daily_active_users('2024-08-07'))  # Number of unique active users

Caching

def get_user(user_id: str) -> dict:
    """
    Check cache first, otherwise get user from the cache and cache the data for an hour.
    """
    cached_user = r.get(f'user:{user_id}')
    if cached_user:
        return json.loads(cached_user)
    
    user = get_user_from_db(user_id)
    # {'id': user_id, 'name': 'John Doe', 'email': 'john@example.com'}
    
    r.setex(f'user:{user_id}', 3600, json.dumps(user))
    
    return user

Rate limiting

def rate_limit(user_id: str, limit: int = 5, period: int = 60) -> bool:
    key = f'rate:{user_id}'
    current = r.get(key)
    
    if current and int(current) >= limit:
        return False
    
    pipe = r.pipeline()
    pipe.incr(key)
    pipe.expire(key, period)
    pipe.execute()
    
    return True

Pub/Sub messaging

# Publisher
def publish_message(channel: str, message: str) -> None:
    r.publish(channel, message)


# Subscriber
pubsub = r.pubsub()
pubsub.subscribe('mychannel')

for message in pubsub.listen():
    if message['type'] == 'message':
        print(f"Received: {message['data']}")