The Fifth Elephant 2013

An Event on Big Data and Cloud Computing

Build a Queue Based Concurrent Task Processor (using Python)

Submitted by Piyush Verma (@meson10) on Saturday, 13 April 2013

Section: Workshops Technical level: Advanced


Learn how to develop a Persistent Queue based Task Processor using simple tools like MongoDB and Python.


At we call it "Party", responsible for handling all the non real-time stuff varying from Video encoding, Indexing Search Documents, Updating cache, tracking participant progress, sending out emails, payments, releasing Node.js sockets etc.

Targets to be achieved: Evaluate the need a task processor, use cases where it would fit || Build a Queue based Task Processor using Python & MongoDB || Scale up number of processes/workers || Implement Cleanup. Keep the Queue fast and small || Scale up number of nodes


Familiarity with MongoDB || Good Understanding of Python || Prior Experience/ Knowledge of multiprocessing || Chips to Munch every time you get restless or bored

Speaker bio

Loves Coding. Currently Otherwise, Loves Cycling, Loves Coffee & Loves Cartoons.



  • Srinivasan Seshadri (@sesh) 6 years ago

    how does this compare with a message queue and many good public domain implementations of message queues?

  • Piyush Verma (@meson10) Proposer 6 years ago

    I started out evaluating RabbitMQ which was too heavy for my requirement and it doesn’t guarantee persistance. Consumers just write JSON data to a MongoDB database which are consumed using a Tailable cursor/ Periodic Polling.
    If I remember correctly, Kushal Das was also working on a Similar Implementation ( but using Redis. Though the Subscriber Publisher model of Redis would be more suited to such distributed problems, but Mongo inches ahead since it supports Nested Document Querying .
    I could have collaborated with Kushal better on this Project but I learnt about his implementation much later, and we had already started using this in production.
    Among other things, I also intend to improve the response times by moving it to UnixDomainSockets (optionally, if the Consumer is on the same node) & implement Priority Queues (Might have to introduce some sharding).

  • t3rmin4t0r (@t3rmin4t0r) 6 years ago

    Gearman has been the crowd favourite for this. Especially the multi-master HA mode for that.

    Have you put up the code somewhere? I’d like to see how this works with poisoned tasks (like someone uploading a gigapixel compressed monochrome jpg to resize).

  • Piyush Verma (@meson10) Proposer 6 years ago

    Re OSS: No the code was too tightly coupled with the consumer logic so far. But that is one conference interest of mine; collect feedback, improvise on it and make it open source.

    Re Heavy Tasks: Well I use signals to monitor timeouts and if a task was too heavy and time consuming it would be timed out. Obviously one could alter the timeout and raise to any or No value (never times out). I dont fork/spawn process at the onset of tasks, instead keep a bunch of pre-forked workers (usually a factor of 2* no. of cores available). If all such workers would be in busy state, Queue is stalled and no more tasks are fetched until the next available worker slot.

    Re Gearman: Currently the code does not support multi Master mode. In fact that is one major drawback which hurts the ideal scale level and it is P1 on things to address list. I am going to check Gearman for that.

Login with Twitter or Google to leave a comment