Common messaging patterns implemented with Redis

December 29, 2016 at 03:44 PM | Redis | View Comments

Redis can make a fantastic lightweight messaging system for situations where the complexity of larger systems (like, for example, RabbitMQ) isn't necessary.

Pub-sub: multi-consumer broadcast messages

One-to-many broadcast messages (ex, a chat room, where every message is sent to every connected client) can be implemented with the PUBLISH and SUBSCRIBE commands, which do exactly what they say on the box: publish messages to a channel, and subscribe to messages published to a channel: https://redis.io/topics/pubsub

For example, using redis-cli:

producer                      | consumer 1                 | consumer 2
------------------------------+----------------------------+----------------------------
                              | > SUBSCRIBE "#dogfacts"    | > SUBSCRIBE "#dogfacts"
                              | Reading messages...        | Reading messages...
                              | 1) "subscribe"             | 1) "subscribe"
                              | 2) "#dogfacts"             | 2) "#dogfacts"
                              | 3) (integer) 1             | 3) (integer) 1
                              |                            |
> PUBLISH "#dogfacts" "hello" | 1) "message"               | 1) "message"
(integer) 2                   | 2) "#dogfacts"             | 2) "#dogfacts"
>                             | 3) "hello"                 | 3) "hello"

This blog post covers building a very simple chat room with Python + Redis: http://programeveryday.com/post/create-a-simple-chat-room-with-redis-pubsub/

The biggest thing to note about PUBLISH/SUBSCRIBE messages is that they don't have any history: a consumer has no way of knowing what messages were sent before it subscribed.

In my experience, PUBLISH/SUBSCRIBE are often combined with some form of persistent storage of older messages. For example, a chat application which stores messages in an SQL database might look something like this:

def send_message(channel, message):
    timestamp = now()
    db.execute("""
        INSERT INTO chat_messages (timestamp, channel, message)
        VALUES (?, ?, ?)
    """, [timestamp, channel, message])
    redis.publish(channel, {
        "message": message,
        "timestamp": timestamp,
    })

def read_messages(channel):
    pubsub = redis.pubsub()
    pubsub.subscribe(channel)
    last_timestamp = 0
    query = """
        SELECT *
        FROM chat_messages
        WHERE channel = ?
        ORDER BY timestamp ASC
    """
    for message in db.execute(query):
        yield message
        last_timestamp = message["timestamp"]

    for message in pubsub.listen():
        if message["timestamp"] <= last_timestamp:
            continue
        yield message

Job queues: single-consumer durable messages

A multi-producer multi-consumer job queue (ie, where each message represents a job which is assigned to exactly one consumer) can be implemented with just one command: BRPOP, Blocking Right POP: https://redis.io/commands/brpop

BRPOP returns the last item in a list, or blocks waiting for an item to become available, and LPUSH pushes an item onto the left side of a list. For example, using redis-cli:

producer                      | consumer 1                 | consumer 2
------------------------------+----------------------------+----------------------------
> LPUSH "work-queue" "job1"   |                            |
(integer) 1                   | > BRPOP "work-queue"       | > BRPOP "work-queue"
>                             | "job1"                     |
> LPUSH "work-queue" "job2"   | >                          | "job2"
(integer) 1                   | > BRPOP "work-queue"       | > BRPOP "work-queue"
>                             |                            |
> LPUSH "work-queue" "job3"   |                            | "job3"
(integer) 1                   |                            | >
> LPUSH "work-queue" "job4"   | "job4"                     | >
(integer) 1                   | >                          | >

And, in Python, a client might look something like this:

def run_worker(queue):
    while True:
        job = redis.brpop(queue)
        run_job(job)

The task queue can even be made somewhat resilient to worker failures by replacing BRPOP with BRPOPLPUSH, which pops an item off one list and atomically adds it to another:

> LPUSH "job-queue" "1"
> LPUSH "job-queue" "2"
> LPUSH "job-queue" "3"
> BRPOPLPUSH "job-queue" "pending-jobs"
> LRANGE "job-queue" -1 0
1) "2"
2) "3"
> LRANGE "pending-jobs" -1 0
1) "1"

Using BRPOPLPUSH, each worker can be given their own pending job list, and if a failure is detected in the worker, those jobs can be re-added to the main job queue.

Note, though, that it's probably a bad idea to write your own job queueing framework. There are a lot of subtleties which can be time consuming to implement (process management, result storage, timeouts, dead worker detection, delayed tasks, and task progress tracking, just to name a few that come immediately to mind). The existing solutions (at least in Python) are feature rich and straight forward to use. I recommend rq or Celery with the Redis backend.

Notice boards: pub-sub with persistence

In pub-sub systems, it's often useful to make the last posted message available to consumers while they are waiting for a new message to be posted. I call this the notice board pattern.

For example, consider an application which uses a progress bar to show the status of a long-running backend task. The backend will periodically publish a progress update, and the front-end will subscribe to those updates, showing first the current value, then updating as progress is made.

The implementation might look something like this:

def nb_notify(board, value):
    timestamp = now()
    ts_value = "%s:%s" %(timestamp, value)
    redis.set(board, ts_value)
    redis.broadcast(board, ts_value)

def nb_watch(board, last_timestamp=0):
    pubsub = redis.pubsub()
    pubsub.subscribe(board)
    ts_value = redis.get(board)

    while True:
        ts_str, _, value = ts_value.partition(":")
        ts = float(ts_str)
        if ts > last_timestamp:
            return (ts, value)
        ts_value = pubsub.wait()

And it could be used with a web application like this:

def progress_view(request):
    last_ts = float(request.GET.get("ts", 0))
    next_ts, value = nb_watch("some-board", last_ts)
    return JSONResponse({
        "next_update_url": request.path + "?ts=%s" %(next_ts, ),
        "value": value,
    })

And the JavaScript on the front-end would look like this:

var nextUrl = "https://example.com/api/progress";
function watchProgress(callback) {
    http.get(nextUrl).then(function(result) {
        nextUrl = result.next_update_url;
        callback(result.value);
        watchProgress(callback);
    });
}

watchProgress(function(value) {
    console.log("Progress:", value);
});
Permalink + Comments