Common messaging patterns implemented with Redis

December 29, 2016 at 03:44 PM | Redis | View Comments

Redis can make a fantastic lightweight messaging system for situations where the complexity of larger systems (like, for example, RabbitMQ) isn't necessary.

Pub-sub: multi-consumer broadcast messages

One-to-many broadcast messages (ex, a chat room, where every message is sent to every connected client) can be implemented with the PUBLISH and SUBSCRIBE commands, which do exactly what they say on the box: publish messages to a channel, and subscribe to messages published to a channel: https://redis.io/topics/pubsub

For example, using redis-cli:

producer                      | consumer 1                 | consumer 2
------------------------------+----------------------------+----------------------------
                              | > SUBSCRIBE "#dogfacts"    | > SUBSCRIBE "#dogfacts"
                              | Reading messages...        | Reading messages...
                              | 1) "subscribe"             | 1) "subscribe"
                              | 2) "#dogfacts"             | 2) "#dogfacts"
                              | 3) (integer) 1             | 3) (integer) 1
                              |                            |
> PUBLISH "#dogfacts" "hello" | 1) "message"               | 1) "message"
(integer) 2                   | 2) "#dogfacts"             | 2) "#dogfacts"
>                             | 3) "hello"                 | 3) "hello"

This blog post covers building a very simple chat room with Python + Redis: http://programeveryday.com/post/create-a-simple-chat-room-with-redis-pubsub/

The biggest thing to note about PUBLISH/SUBSCRIBE messages is that they don't have any history: a consumer has no way of knowing what messages were sent before it subscribed.

In my experience, PUBLISH/SUBSCRIBE are often combined with some form of persistent storage of older messages. For example, a chat application which stores messages in an SQL database might look something like this:

def send_message(channel, message):
    timestamp = now()
    db.execute("""
        INSERT INTO chat_messages (timestamp, channel, message)
        VALUES (?, ?, ?)
    """, [timestamp, channel, message])
    redis.publish(channel, {
        "message": message,
        "timestamp": timestamp,
    })

def read_messages(channel):
    pubsub = redis.pubsub()
    pubsub.subscribe(channel)
    last_timestamp = 0
    query = """
        SELECT *
        FROM chat_messages
        WHERE channel = ?
        ORDER BY timestamp ASC
    """
    for message in db.execute(query):
        yield message
        last_timestamp = message["timestamp"]

    for message in pubsub.listen():
        if message["timestamp"] <= last_timestamp:
            continue
        yield message

Job queues: single-consumer durable messages

A multi-producer multi-consumer job queue (ie, where each message represents a job which is assigned to exactly one consumer) can be implemented with just one command: BRPOP, Blocking Right POP: https://redis.io/commands/brpop

BRPOP returns the last item in a list, or blocks waiting for an item to become available, and LPUSH pushes an item onto the left side of a list. For example, using redis-cli:

producer                      | consumer 1                 | consumer 2
------------------------------+----------------------------+----------------------------
> LPUSH "work-queue" "job1"   |                            |
(integer) 1                   | > BRPOP "work-queue"       | > BRPOP "work-queue"
>                             | "job1"                     |
> LPUSH "work-queue" "job2"   | >                          | "job2"
(integer) 1                   | > BRPOP "work-queue"       | > BRPOP "work-queue"
>                             |                            |
> LPUSH "work-queue" "job3"   |                            | "job3"
(integer) 1                   |                            | >
> LPUSH "work-queue" "job4"   | "job4"                     | >
(integer) 1                   | >                          | >

And, in Python, a client might look something like this:

def run_worker(queue):
    while True:
        job = redis.brpop(queue)
        run_job(job)

The task queue can even be made somewhat resilient to worker failures by replacing BRPOP with BRPOPLPUSH, which pops an item off one list and atomically adds it to another:

> LPUSH "job-queue" "1"
> LPUSH "job-queue" "2"
> LPUSH "job-queue" "3"
> BRPOPLPUSH "job-queue" "pending-jobs"
> LRANGE "job-queue" -1 0
1) "2"
2) "3"
> LRANGE "pending-jobs" -1 0
1) "1"

Using BRPOPLPUSH, each worker can be given their own pending job list, and if a failure is detected in the worker, those jobs can be re-added to the main job queue.

Note, though, that it's probably a bad idea to write your own job queueing framework. There are a lot of subtleties which can be time consuming to implement (process management, result storage, timeouts, dead worker detection, delayed tasks, and task progress tracking, just to name a few that come immediately to mind). The existing solutions (at least in Python) are feature rich and straight forward to use. I recommend rq or Celery with the Redis backend.

Notice boards: pub-sub with persistence

In pub-sub systems, it's often useful to make the last posted message available to consumers while they are waiting for a new message to be posted. I call this the notice board pattern.

For example, consider an application which uses a progress bar to show the status of a long-running backend task. The backend will periodically publish a progress update, and the front-end will subscribe to those updates, showing first the current value, then updating as progress is made.

The implementation might look something like this:

def nb_notify(board, value):
    timestamp = now()
    ts_value = "%s:%s" %(timestamp, value)
    redis.set(board, ts_value)
    redis.broadcast(board, ts_value)

def nb_watch(board, last_timestamp=0):
    pubsub = redis.pubsub()
    pubsub.subscribe(board)
    ts_value = redis.get(board)

    while True:
        ts_str, _, value = ts_value.partition(":")
        ts = float(ts_str)
        if ts > last_timestamp:
            return (ts, value)
        ts_value = pubsub.wait()

And it could be used with a web application like this:

def progress_view(request):
    last_ts = float(request.GET.get("ts", 0))
    next_ts, value = nb_watch("some-board", last_ts)
    return JSONResponse({
        "next_update_url": request.path + "?ts=%s" %(next_ts, ),
        "value": value,
    })

And the JavaScript on the front-end would look like this:

var nextUrl = "https://example.com/api/progress";
function watchProgress(callback) {
    http.get(nextUrl).then(function(result) {
        nextUrl = result.next_update_url;
        callback(result.value);
        watchProgress(callback);
    });
}

watchProgress(function(value) {
    console.log("Progress:", value);
});
Permalink + Comments

I spent $4 on Mechnical Turk and all I got were these Turing machine jokes

August 19, 2016 at 12:19 PM | Play | View Comments

I logged in to my Amazon Mechanical Turk (see also: Mechnical Turk) for the first time in years the other day:

/uploads/mechanical-turk.png

And was reminded of the best $4 I spent in 2008: a collection of terrible Turing machine related jokes.

Below are the raw and unedited results of asking workers on Mechanical Turk to help me find Turing machine jokes. Each respondant was paid $0.25 if their response was accepted, and I was pretty liberal accepting responses:

1m 30s
0% (0/6)
Rejected
What did Snoop Dogg say to the Turing Machine? You better recognize!.
22s
0% (0/6)
Rejected
Q: Why did the Turing Machine cross the road?

A: To go to Office Depot to buy some more tape!

Because it had run out!!

HAHAHAHAHA!!!
14s
100% (1/1)
Approved
Q: How many light bulbs does it take to change a light bulb?
A: One, if it knows its own Goedel number.
10s
100% (3/3)
Approved
I think some people miss the point of the Turing test. The point is not to determine how intelligent someone or something is, but to determine how good a computer is at imitating a human.
1m 29s
0% (0/6)
Rejected
A Turing Machine walks into a bar. The bartender asks him what he would like. The Turing Machine sits there thinking for what seems like forever. Finally, the impatient bartender asks "Well, what's it going to be?!?". The Turing Machine, embarrassed, admits: "I can't decide."
31s
100% (1/1)
Approved
How many Turing Machines does it take to change a lightbulb?
None--that's a hardware problem.
12m 12s
100% (1/1)
Approved
I couldn't find any either. Not so familiar with the domain, but did give some originals a shot.
-------
Q. Why isn't the poet Turing Machine married?
A. The peculiarity is that it prefers it's own singularity.

Knock knock
Who's there
Turing machine
Turing machine who
One extra 'o' and I've could've been a cool motorbike (Touring machine) instead of responding to formalist jokes like I'm not faking this nihilism to pretend I have a personality.

What did the solipsophist and the Turing Machine say to each other.
Nothing, they both thought the other did not exist outside of memory.

What's the difference between a Turing Machine and a tax collector?
At least the Turing Machine is pretending to be human.
4s
100% (3/3)
Approved
Q: Why did the Turing Machine cross the road?

A: To go to Office Depot to buy some more tape!
23s
100% (3/3)
Approved
"In order that the machine should have a chance of finding things out for itself it should be allowed to roam the country side, and the danger to the ordinary citizen would be serious." (Turing 1947)
28s
100% (11/11)
Approved
Maybe you'll appreciate this

http://www.getacoder.com/projects/bug_finder_92913.html

(perhaps not directly relevant, but I hope you'll like it anyway).
50s
100% (1/1)
Approved
To emulate a Turing machine, all you need is a roll of toilet paper and some pebbles.
6s
0% (0/6)
Rejected
The Turing Machine... The basis for all modern computation... Yet only a handful of people have thought it worth while to make jokes about them!

What a travesty -- this must be changed.

Please post your favorite Turing Machine jokes here, in hopes that they will be immortalized for generations to come.
38s
0% (0/6)
Rejected
What did AI1 say to AI2?

"You may be Turing complete, put I convinced some guy I can converse intelligently in Mandarin just by running an algorithm I can never understand because I am neither sentient nor alive."

hold for laughs
19m 37s
100% (1/1)
Approved
OK, I have been thinking since you posted these hits, and I can't come up with a dam thing and I am one of the few old school that actually know what a turing machine actual is without looking it up on Wikipedia. I had an awesome idea for getting your jokes, but it requires a cell phone, and my wife has mine tonight because hers died a couple days ago. Anyway here is what you do because I'm sure these hits will be gone before she gets back. Have you heard of ChaCha? I'm a guide and work for them, we answer questions by searching the net, and one of the things we get is a ton of joke request, which are easy we just pull one from the net. UNLESS, and we are required to do this, someone ask us to make up a joke about a subject.

ChaCha is free, just text or call in your request several times, as many as you like, it is per use, so you just can ask to send 20 jokes at a time. Once you get one, you can just text the word 'more', and the guide will look at last request history and research and make up a new joke. Make sure to request they don't just pull something for the internet.

Text chacha-242242 , or you can call 1-800-2chacha - 1-800-224-2242, if you call you have to do it from a cell phone because the answer will come back via test message. If you don't pay me for this, no big deal, hope it helps!
Permalink + Comments

.ssh/config every day

August 18, 2014 at 01:52 PM | Shell, OpenSSL | View Comments

I'd like to take a moment to share a few ways I use ~/.ssh/config file to make my life happier every day.

With these options I never need to remember host names, usernames, or port numbers, and the vast majority of my SSH commands look like:

$ ssh myapp
$ ssh myclient-prod-db
$ rsync -a app-backup:backups/jan01 .

Every time I get ssh access to a server I add an entry to my config file giving the host a name that's meaningful to me (for example, "someclient-server" or "myproj-backup") and setting the default username and port:

Host someclient-dev
    Hostname 11.22.33.44
    User dev

Host someclient-prod-app
    Hostname redbunny.myclient.com
    Port 4242
    User prod

Host someclient-prod-db
    Hostname bluefish.myclient.com
    Port 4242
    User db

These host alias can be used just about everywhere a hostname is passed to SSH, including:

  • SSH from the command line:

    $ ssh someclient-dev
    ...
    dev@11.22.33.44 $
    
  • git, mercurial, or other version control systems:

    $ git remote add dev someclient-dev:repo/
    
  • rsync:

    $ rsync -a media/ someclient-dev:media/
    

Not only does this mean I never need to remember weird hostnames or arbitrary usernames, but I can also open the file to see a list of all the machines I've ever had access to (which can be very useful when an old machines needs work done).

The bash-completion package is even .ssh/config aware, so tab completion will work as expected:

$ ssh someclient-<tab>
someclient-dev someclient-prod-app someclient-prod-db

Amazon EC2 key management is also a huge continence. Each time I get access to an Amazon EC2 instance I add the IdentityFile to the Host definition:

Host *.amazonaws.com
    User ec2-user

Host myapp
    Hostname ec2-1-2-3-4.compute-1.amazonaws.com
    IdentityFile ~/.ssh/aws-myapp.pem

As above, this will create the host alias myapp, and the identify file ~/.ssh/aws-myapp.pem will be used to connect (no more -I flag on the command line).

Finally, there are a few options that are useful to set for all hosts:

Host *
    # Instead of just printing the host key fingerprint as an opaque hex
    # string, print a pretty art. Ostensibly this is for security, but
    # mostly it's pretty:
    #     +--[ RSA 2048]----+
    #     | oE    ..        |
    #     |  ..   ...       |
    #     |   .  ooo        |
    #     |   oooooo        |
    #     |  . =+.+S+       |
    #     |   o.+o.o..      |
    #     |    o..          |
    #     +-----------------+
    VisualHostKey yes

    # Send explicit keepalive packets. This isn't often a problem, but I've
    # run into a few combinations of network and machine that will drop
    # inactive connections.
    KeepAlive yes
    ServerAliveInterval 60

    # SSH Agent Forwarding is described here:
    # http://www.unixwiz.net/techtips/ssh-agent-forwarding.html
    ForwardAgent yes

    # SSH Control Channels allow multiple SSH sessions to share one
    # connection. For example, the first time I run "ssh myapp", ssh will
    # create a new connection to the server (creating a TCP connection,
    # authenticating, etc). As long as that connection
    # is active, though, running "ssh myapp" from another terminal will
    # re-use the same TCP connection, authentication, etc, making the
    # command virtually instant.
    # Note that the ControlPersist option is important, otherwise all the
    # sessions will be disconnected when the master session closes.
    ControlPath ~/.ssh/control/master-%l-%r@%h:%p
    ControlMaster auto
    ControlPersist 60
Permalink + Comments

The Sadness of Python's super()

April 02, 2014 at 07:26 PM | Python | View Comments

The dangers of Python's super have been documented... but, in my humble opinion, not well enough.

A major problem with Python's super() is that it is not straight forward to figure out needs to call it, even if it doesn't seem like the method's parent class should need to.

Consider this example, where mixins are used to update a dictionary with some context (similar to, but less correct than, for example, Django's TemplateView):

class FirstMixin(object):
    def get_context(self):
        return {"first": True}

class BaseClass(FirstMixin):
    def get_context(self):
        ctx = super(BaseClass, self).get_context()
        ctx.update({"base": True})
        return ctx

class SecondMixin(object):
    def get_context(self):
        ctx = super(SecondMixin, self).get_context()
        ctx.update({"second": True})
        return ctx

class ConcreteClass(BaseClass, SecondMixin):
    pass

This looks correct... but it isn't! Because FirstMixin doesn't call super(), SecondMixin.get_context is never called:

>>> c = ConcreteClass()
>>> c.get_context()
{"base": True, "first": True} # Note that ``"second": True`` is missing!

Alternatively, image that FirstMixin.get_context() does call super():

class FirstMixin(object):
    def get_context(self):
        ctx = super(FirstMixin, self).get_context()
        ctx.update({"first": True})
        return ctx

This will also be incorrect, because now the call to super() in SecondMixin will trigger an error, because the final base class - object - does not have a get_context() method:

>>> c = ConcreteClass()
>>> c.get_context()
...
AttributeError: 'super' object has no attribute 'get_context'

What is a poor Pythonista to do?

There are three reasonably simple rules to follow when dealing with this kind of multiple inheritance:

  1. Mixins should always call super().
  2. The base class should not call super().
  3. The base class (or one of its super classes) needs to be at the right of sub-classe's list of base classes.

Note that this will often mean introducing an otherwise unnecessary *Base class.

To correct the example above:

# Following rule (1), every mixin calls `super()`
class FirstMixin(object):
    def get_context(self):
        ctx = super(FirstMixin, self).get_context()
        ctx.update({"first": True})
        return ctx

# Following rule (2), the base class does *not* call super.
class BaseClassBase(object):
    def get_context(self):
        return {"base": True}

# Notice that, to follow rule (3), an otherwise uneccessary base class has
# been introduced to make sure that the "real" base class (the one without
# the call to super) can be at the very right of the list of base classess.
class BaseClass(FirstMixin, BaseClassBase):
    pass

# Following rule (3), the base class comes at the right end of the list
# of base classess.
class ConcreteClass(SecondMixin, BaseClass):
    pass

This will guarantee that the mixins are always called before the base class, which doesn't call super() in get_context().

Note that this will still cause problems in the even that multiple base classess are used (ie, "true" multiple inheritance)... and there isn't much which can be done about that, at least in the general case.

It is also worth noting that that in many cases the best solution is to avoid inheritance all together, opting instead for a pattern better suited to the requirements of the specific problem at hand.

For example, in the sitaution from the example above - where many different "things" (in the above example: mixins and the base class) need to contribute to the "context" dictionary - one option which might be more appropriate is an explicit set of "context providers":

class FirstContextProvider(object):
    def __call__(self):
        return {"first": True}

class BaseClass(FirstMixin):
    context_providers = [
        FirstContextProvider(),
        lambda: {"base": True},
    ]

    def get_context(self):
        ctx = {}
        for provider in self.context_providers:
            ctx.update(provider())
        return ctx

class SecondContextProvider(object):
    def __call__(self):
        return {"second": True}

class ConcreteClass(BaseClass, SecondMixin):
    context_providers = BaseClass.context_providers + [
        SecondContextProvider(),
    ]

(recall that __call__ method is used to make instances of a call callable)

Edit: I was corrected by @lambacck, who pointed out the "base class on the right" rule: https://twitter.com/lambacck/status/451528854507905024

Permalink + Comments

Atomic Bank Balance Transfer with CouchDB

March 13, 2014 at 10:03 PM | Uncategorized | View Comments

Googling around the other day I was disappointed to find that the internet has a few incorrect examples of how atomic bank account transfers can be implemented with CouchDB... but I wasn't able to find any correct examples.

So here it is: the internet's first 100% complete and correct implementation of the classic "atomic bank balance transfer problem" in CouchDB.

First, a brief recap of the problem: how can a banking system which allows money to be transfered between accounts be designed so that there are no race conditions which might leave invalid or nonsensical balances?

There are a few parts to this problem:

First: the transaction log. Instead of storing an account's balance in a single record or document — {"account": "Dave", "balance": 100} — the account's balance is calculated by summing up all the credits and debits to that account. These credits and debits are stored in a transaction log, which might look something like this:

{"from": "Dave", "to": "Alex", "amount": 50}
{"from": "Alex", "to": "Jane", "amount": 25}

And the CouchDB map-reduce functions to calculate the balance could look something like this:

POST /transactions/balances
{
    "map": function(txn) {
        emit(txn.from, txn.amount * -1);
        emit(txn.to, txn.amount);
    },
    "reduce": function(keys, values) {
        return sum(values);
    }
}

For completeness, here is the list of balances:

GET /transactions/balances
{
    "rows": [
        {
            "key" : "Alex",
            "value" : 25
        },
        {
            "key" : "Dave",
            "value" : -50
        },
        {
            "key" : "Jane",
            "value" : 25
        }
    ],
    ...
}

But this leaves the obvious question: how are errors handled? What happens if someone tries to make a transfer larger than their balance?

With CouchDB (and similar databases) this sort of business logic and error handling must be implemented at the application level. Naively, such a function might look like this:

def transfer(from_acct, to_acct, amount):
    txn_id = db.post("transactions", {"from": from_acct, "to": to_acct, "amount": amount})
    if db.get("transactions/balances") < 0:
        db.delete("transactions/" + txn_id)
        raise InsufficientFunds()

But notice that if the application crashes between inserting the transaction and checking the updated balances the database will be left in an inconsistent state: the sender may be left with a negative balance, and the recipient with money that didn't previously exist:

// Initial balances: Alex: 25, Jane: 25
db.post("transactions", {"from": "Alex", "To": "Jane", "amount": 50}
// Current balances: Alex: -25, Jane: 75

How can this be fixed?

To make sure the system is never in an inconsistent state, two pieces of information need to be added to each transaction:

  1. The time the transaction was created (to ensure that there is a strict total ordering of transactions), and
  2. A status — whether or not the transaction was successful.

There will also need to be two views — one which returns an account's available balance (ie, the sum of all the "successful" transactions), and another which returns the oldest "pending" transaction:

POST /transactions/balance-available
{
    "map": function(txn) {
        if (txn.status == "successful") {
            emit(txn.from, txn.amount * -1);
            emit(txn.to, txn.amount);
        }
    },
    "reduce": function(keys, values) {
        return sum(values);
    }
}

POST /transactions/oldest-pending
{
    "map": function(txn) {
        if (txn.status == "pending") {
            emit(txn._id, txn);
        }
    },
    "reduce": function(keys, values) {
        var oldest = values[0];
        values.forEach(function(txn) {
            if (txn.timestamp < oldest) {
                oldest = txn;
            }
        });
        return oldest;
    }

}

List of transfers might now look something like this:

{"from": "Alex", "to": "Dave", "amount": 100, "timestamp": 50, "status": "successful"}
{"from": "Dave", "to": "Jane", "amount": 200, "timestamp": 60, "status": "pending"}

Next, the application will need to have a function which can resolve transactions by checking each pending transaction in order to verify that it is valid, then updating its status from "pending" to either "successful" or "rejected":

def resolve_transactions(target_timestamp):
    """ Resolves all transactions up to and including the transaction
        with timestamp ``target_timestamp``. """
    while True:
        # Get the oldest transaction which is still pending
        txn = db.get("transactions/oldest-pending")
        if txn.timestamp > target_timestamp:
            # Stop once all of the transactions up until the one we're
            # interested in have been resolved.
            break

        # Then check to see if that transaction is valid
        if db.get("transactions/available-balance", id=txn.from) >= txn.amount:
            status = "successful"
        else:
            status = "rejected"

        # Then update the status of that transaction. Note that CouchDB
        # will check the "_rev" field, only performing the update if the
        # transaction hasn't already been updated.
        txn.status = status
        couch.put(txn)

Finally, the application code for correctly performing a transfer:

def transfer(from_acct, to_acct, amount):
    timestamp = time.time()
    txn = db.post("transactions", {
        "from": from_acct,
        "to": to_acct,
        "amount": amount,
        "status": "pending",
        "timestamp": timestamp,
    })
    resolve_transactions(timestamp)
    txn = couch.get("transactions/" + txn._id)
    if txn_status == "rejected":
        raise InsufficientFunds()

A couple of notes:

  • For the sake of brevity, this specific implementation assumes some amount of atomicity in CouchDB's map-reduce. Updating the code so it does not rely on that assumption is left as an exercise to the reader.
  • Master/master replication or CouchDB's document sync have not been taken into consideration. Master/master replication and sync make this problem significantly more difficult.
  • In a real system, using time() might result in collisions, so using something with a bit more entropy might be a good idea; maybe "%s-%s" %(time(), uuid()), or using the document's _id in the ordering. Including the time is not strictly necessary, but it helps maintain a logical if multiple requests come in at about the same time.
Permalink + Comments