Common messaging patterns implemented with Redis
December 29, 2016 at 03:44 PM | Redis | View CommentsRedis can make a fantastic lightweight messaging system for situations where the complexity of larger systems (like, for example, RabbitMQ) isn't necessary.
Pub-sub: multi-consumer broadcast messages
One-to-many broadcast messages (ex, a chat room, where every message is sent to every connected client) can be implemented with the PUBLISH and SUBSCRIBE commands, which do exactly what they say on the box: publish messages to a channel, and subscribe to messages published to a channel: https://redis.io/topics/pubsub
For example, using redis-cli:
producer | consumer 1 | consumer 2 ------------------------------+----------------------------+---------------------------- | > SUBSCRIBE "#dogfacts" | > SUBSCRIBE "#dogfacts" | Reading messages... | Reading messages... | 1) "subscribe" | 1) "subscribe" | 2) "#dogfacts" | 2) "#dogfacts" | 3) (integer) 1 | 3) (integer) 1 | | > PUBLISH "#dogfacts" "hello" | 1) "message" | 1) "message" (integer) 2 | 2) "#dogfacts" | 2) "#dogfacts" > | 3) "hello" | 3) "hello"
This blog post covers building a very simple chat room with Python + Redis: http://programeveryday.com/post/create-a-simple-chat-room-with-redis-pubsub/
The biggest thing to note about PUBLISH/SUBSCRIBE messages is that they don't have any history: a consumer has no way of knowing what messages were sent before it subscribed.
In my experience, PUBLISH/SUBSCRIBE are often combined with some form of persistent storage of older messages. For example, a chat application which stores messages in an SQL database might look something like this:
def send_message(channel, message): timestamp = now() db.execute(""" INSERT INTO chat_messages (timestamp, channel, message) VALUES (?, ?, ?) """, [timestamp, channel, message]) redis.publish(channel, { "message": message, "timestamp": timestamp, }) def read_messages(channel): pubsub = redis.pubsub() pubsub.subscribe(channel) last_timestamp = 0 query = """ SELECT * FROM chat_messages WHERE channel = ? ORDER BY timestamp ASC """ for message in db.execute(query): yield message last_timestamp = message["timestamp"] for message in pubsub.listen(): if message["timestamp"] <= last_timestamp: continue yield message
Job queues: single-consumer durable messages
A multi-producer multi-consumer job queue (ie, where each message represents a job which is assigned to exactly one consumer) can be implemented with just one command: BRPOP, Blocking Right POP: https://redis.io/commands/brpop
BRPOP returns the last item in a list, or blocks waiting for an item to become available, and LPUSH pushes an item onto the left side of a list. For example, using redis-cli:
producer | consumer 1 | consumer 2 ------------------------------+----------------------------+---------------------------- > LPUSH "work-queue" "job1" | | (integer) 1 | > BRPOP "work-queue" | > BRPOP "work-queue" > | "job1" | > LPUSH "work-queue" "job2" | > | "job2" (integer) 1 | > BRPOP "work-queue" | > BRPOP "work-queue" > | | > LPUSH "work-queue" "job3" | | "job3" (integer) 1 | | > > LPUSH "work-queue" "job4" | "job4" | > (integer) 1 | > | >
And, in Python, a client might look something like this:
def run_worker(queue): while True: job = redis.brpop(queue) run_job(job)
The task queue can even be made somewhat resilient to worker failures by replacing BRPOP with BRPOPLPUSH, which pops an item off one list and atomically adds it to another:
> LPUSH "job-queue" "1" > LPUSH "job-queue" "2" > LPUSH "job-queue" "3" > BRPOPLPUSH "job-queue" "pending-jobs" > LRANGE "job-queue" -1 0 1) "2" 2) "3" > LRANGE "pending-jobs" -1 0 1) "1"
Using BRPOPLPUSH, each worker can be given their own pending job list, and if a failure is detected in the worker, those jobs can be re-added to the main job queue.
Note, though, that it's probably a bad idea to write your own job queueing framework. There are a lot of subtleties which can be time consuming to implement (process management, result storage, timeouts, dead worker detection, delayed tasks, and task progress tracking, just to name a few that come immediately to mind). The existing solutions (at least in Python) are feature rich and straight forward to use. I recommend rq or Celery with the Redis backend.
Notice boards: pub-sub with persistence
In pub-sub systems, it's often useful to make the last posted message available to consumers while they are waiting for a new message to be posted. I call this the notice board pattern.
For example, consider an application which uses a progress bar to show the status of a long-running backend task. The backend will periodically publish a progress update, and the front-end will subscribe to those updates, showing first the current value, then updating as progress is made.
The implementation might look something like this:
def nb_notify(board, value): timestamp = now() ts_value = "%s:%s" %(timestamp, value) redis.set(board, ts_value) redis.broadcast(board, ts_value) def nb_watch(board, last_timestamp=0): pubsub = redis.pubsub() pubsub.subscribe(board) ts_value = redis.get(board) while True: ts_str, _, value = ts_value.partition(":") ts = float(ts_str) if ts > last_timestamp: return (ts, value) ts_value = pubsub.wait()
And it could be used with a web application like this:
def progress_view(request): last_ts = float(request.GET.get("ts", 0)) next_ts, value = nb_watch("some-board", last_ts) return JSONResponse({ "next_update_url": request.path + "?ts=%s" %(next_ts, ), "value": value, })
And the JavaScript on the front-end would look like this:
var nextUrl = "https://example.com/api/progress"; function watchProgress(callback) { http.get(nextUrl).then(function(result) { nextUrl = result.next_update_url; callback(result.value); watchProgress(callback); }); } watchProgress(function(value) { console.log("Progress:", value); });