KISS: Use the built in sum() instead of reduce to aggregate over a list comprehension

This post is the beginning of a KISS tag, a place where I will put all the "over complications" I find on codes that I work on, or even to comment on my own mistakes.


Today I was working on a Django reports app and I saw this code:

result = reduce(lambda x, y: x + y, \
        [i.thing.price for i in \

At the first look, specially because of the use of reduce I thought it was a complicated issue to solve.

2 seconds after I realized.

Why using reduce for sum when Python already has the built in sum function?

result = sum([i.thing.price for i in ModelObject.objects.filter(created_at__gte=date)])

Well Python gives us powerful builtins so just use this!

The other problem here is the memory usage of the above solution, it will first get the objects list from filter() and after that it will iterate one by one, doing a field lookup to take the price and return a new list with values to sum.

It can kill your server!

On this case things can be done in a better way! we are talking about Django! and even I am being a Django ORM hater I know that it has some cool things like this one:

Django ORM aggregations

from django.db.models import Count
queryset = ModelObject.objects.filter(created_at__gte=date)
aggregation = queryset.aggregate(price=Sum('thing__price'))
result = aggregation.get('price', 0)

On the above code, the aggregation Sum will translate in to a SQL command and the sum will be performed on the database side! much better

I really do not like the Django ORM syntax, also I hate the way I bind the objects, maybe because I am used to use the wonderful DAL I prefer to refer to data as data, I mean, data as Rows not data as objects. But in cases I am working with Django, I think the best is to use its powerful tools!

Keep It Simple Stupid!

Django ListField e SeparetedValuesField

Revisiting this with a ListField type you can use. But it makes a few of assumptions, such as the fact that you're not storing complex types in your list. For this reason I used ast.literal_eval() to enforce that only simple, built-in types can be stored as members in a ListField:

from django.db import models
import ast

class ListField(models.TextField):
    __metaclass__ = models.SubfieldBase
    description = "Stores a python list"

    def __init__(self, *args, **kwargs):
        super(ListField, self).__init__(*args, **kwargs)

    def to_python(self, value):
        if not value:
            value = []

        if isinstance(value, list):
            return value

        return ast.literal_eval(value)

    def get_prep_value(self, value):
        if value is None:
            return value

        return unicode(value)

    def value_to_string(self, obj):
        value = self._get_val_from_obj(obj)
        return self.get_db_prep_value(value)

class Dummy(models.Model):
    mylist = ListField()

Taking it for a spin:

>>> from foo.models import Dummy, ListField
>>> d = Dummy()
>>> d.mylist
>>> d.mylist = [3,4,5,6,7,8]
>>> d.mylist
[3, 4, 5, 6, 7, 8]
>>> f = ListField()
>>> f.get_prep_value(d.numbers)
u'[3, 4, 5, 6, 7, 8]'

There you have it that a list is stored in the database as a unicode string, and when pulled back out it is run through ast.literal_eval().

Previously I suggested this solution from this blog post about Custom Fields in Django:

An alternative to the CommaSeparatedIntegerField, it allows you to store any separated values. You can also optionally specify a token parameter.

from django.db import models

class SeparatedValuesField(models.TextField):
    __metaclass__ = models.SubfieldBase

    def __init__(self, *args, **kwargs):
        self.token = kwargs.pop('token', ',')
        super(SeparatedValuesField, self).__init__(*args, **kwargs)

    def to_python(self, value):
        if not value: return
        if isinstance(value, list):
            return value
        return value.split(self.token)

    def get_db_prep_value(self, value):
        if not value: return
        assert(isinstance(value, list) or isinstance(value, tuple))
        return self.token.join([unicode(s) for s in value])

    def value_to_string(self, obj):
        value = self._get_val_from_obj(obj)
        return self.get_db_prep_value(value)

Add a counter on Django admin home page

Recently I tried many ways to add a simple record counter on Django admin home page, I needed it to look like this:


I have tried django admin tools, overwriting the _meta on but the probleam with admin tools is that it installed a lot of aditional stuff I did not like to use, and the problem with other approaches was because I needed it to be dynamic. Overwriting the __meta seemed to be the right way but is binded only one time, and no updates done until the app restarts.

My friend Fernando Macedo did it the right way!

specialize the string type to add your desired dynamic behavior

from django.db import models

class VerboseName(str):
    def __init__(self, func):
        self.func = func

    def decode(self, encoding, erros):
        return self.func().decode(encoding, erros)

class UsedCoupons(models.Model):
    name = models.CharField(max_length=10)

    class Meta:
        verbose_name_plural = VerboseName(lambda: u"Used Coupons (%d)" % UsedCoupons.objects.count())

And this gives us a lesson, try to solve your problems in pure Python before looking for tricks or ready solutions. (wow it is a dynamic language!)

Programmatically check if Django South has migrations to run

Programmatically check if South has migrations to run.

from south import migration
from south.models import MigrationHistory

apps  = list(migration.all_migrations())

applied_migrations = MigrationHistory.objects.filter(app_name__in=[app.app_label() for app in apps])
applied_migrations = ['%s.%s' % (mi.app_name,mi.migration) for mi in applied_migrations]

num_new_migrations = 0
for app in apps:
    for migration in app:
        if migration.app_label() + "." + not in applied_migrations:
            num_new_migrations = num_new_migrations + 1

return num_new_migrations

It can be wrapped in to a function and can be used to monitor South state in admin.

Based on and some C/P from stack overflow

Automatically start the debugger on an exception

When Python runs a script and an uncatched exception is raised, a traceback is printed and the script is terminated. Python2.1 has introduced sys.excepthook, which can be used to override the handling of uncaught exceptions. This allows to automatically start the debugger on an unexpected exception, even if python is not running in interactive mode.

# code snippet, to be included in ''
import sys

def info(type, value, tb):
   if hasattr(sys, 'ps1') or not sys.stderr.isatty():
      # we are in interactive mode or we don't have a tty-like
      # device, so we call the default hook
      sys.__excepthook__(type, value, tb)
      import traceback, pdb
      # we are NOT in interactive mode, print the exception...
      traceback.print_exception(type, value, tb)
      # ...then start the debugger in post-mortem mode.

sys.excepthook = info

The above snipper can be easily included in your editor snippets and you can set it on top of your files, or even better you can include on your

NOTE: Use: import pywin.debugger and if you want a gui

Based on this stacj overflow thread.

DO.PY - CLI tool to manage your to-do list, based on docopt and DAL


 ____                     _
|  _ \  ___   _ __  _   _| |
| | | |/ _ \ | '_ \| | | | |
| |_| | (_) || |_) | |_| |_|
|____/ \___(_) .__/ \__, (_)
             |_|    |___/

To Do list on Command Line Interface

Manage to-do list on a shell based simple interface and stores your to-do locally on a sqlite database

optionally use your Dropbox to store the database



pip install dopy


git clone

cd dopy

python install


git clone

chmod +x dopy/dopy/

sudo ln -s path/to/dopy/dopy/ /bin/dopy

Maybe the pip option will not be working for a while


____                     _
|  _ \  ___   _ __  _   _| |
| | | |/ _ \ | '_ \| | | | |
| |_| | (_) || |_) | |_| |_|
|____/ \___(_) .__/ \__, (_)
             |_|    |___/

Usage: [--use=<db>] [--args] add <name> [<tag>] [<status>] [--reminder=<reminder>] [--use=<db>] [--args] done <id> [--use=<db>] [--args] ls [--all] [--tag=<tag>] [--status=<status>] [--search=<term>] [--date=<date>] [--month=<month>] [--day=<day>] [--year=<year>] [--use=<db>] [--args] rm <id> [--use=<db>] [--args] get <id> [--use=<db>] [--args] note <id> [--use=<db>] [--rm=<noteindex>] [--args] show <id> [--use=<db>] [--args] note <id> <note> [--use=<db>] [--args] export <path> [--format=<format>] [--use=<db>] [--args] setpath <path> [--args] use <db> [--args] -h | --help [--args] --version [--args] --args

  -h --help      Show this screen.
  --version     Show version.
  --args          Show args.
  1. to enter in SHELL mode python or simply dopy if installed

  2. Add a new task dopy <name> <tag> <reminder>

dopy add "Pay the telephone bill" personal new --reminder=today

with default values for tag, status and reminder

dopy add "Implement new features on my project"

  1. List taks

List all open tasks

dopy ls

$ python ls
$ dopy ls
|ID|                         Name|     Tag|Status|Reminder|            Created|
| 3|           Pay telephone bill|personal|   new|   today|2012-12-31 08:03:15|
| 4|Implement features on project| default|   new|    None|2012-12-31 08:03:41|
TOTAL:2 tasks

By tag

`dopy ls --tag=personal

By name

dopy ls --search=phone

By status

dopy ls --status=done


dopy ls --all

  1. Mark as done

dopy done

dopy done 2

  1. Remove a task

dopy rm 2

  1. Get a task in shell mode for editing

dopy get 3

$ dopy get 3
To show the task
>>> print task
To show a field (available name, tag, status, reminder)
To edit the task assign to a field
>>> = "Other name"
To delete a task
>>> task.delete()
To exit
>>> quit()

>>> print task
<Row {'status': 'new', 'name': 'Pay telephone bill', 'deleted': False, 'created_on': datetime.datetime(2012, 12, 31, 8, 3, 15), 'tag': 'personal', 'reminder': 'today', 'id': 3}>
>>> task.status
>>> task.status = "working"
>>> task.status


Doing a dopy ls you can see the ID of the tasks, using this ID you can assign notes

  1. Including a note

dopy note 1 "This is the note for the task 1"

The above command inserts the note and prints the TASK with notes.

|ID| Name|    Tag|Status|Reminder|    Created|
| 1|teste|default|   new|    None|01/01-02:14|
0 This is the note fot task 1
1 notes
  1. Consulting the notes

You can also show all notes for a task using the show command

dopy show 1

|ID| Name|    Tag|Status|Reminder|    Created|
| 1|teste|default|   new|    None|01/01-02:14|
0 This is the note fot task 1
1 This is another note for task 1
2 notes
  1. Removing a note

Notes can be removed by its index number.

Example: To remove the latest note

dopy note 1 --rm=-1

where -1 is the index for the last element in notes

To remove the first note

dopy note 1 --rm=0

Switching DBS

It is possible to use more than one database by switching using --use argument

dopy add "Including on another db" --use=mynewdb

The above command will use a db called "mynewdb" (it will be created if not exists)

In the same way you have to specify the db for other operations

dopy ls --all --use=mynewdb to list all tasks on the db

Note, you can also change the default db in .dopyrc file


  • Sync with google task
  • Sync with remember the milk
  • Generate HTML and PDF reports on /tmp

web2py and Redis Queue

RQ (Redis Queue)

RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web(2py) stack easily.


Free open source full-stack framework for rapid development of fast, scalable, secure and portable database-driven web-based applications. Written and programmable in Python.

Queueing jobs with RQ and web2py

web2py as many other web frameworks works in a request -> response environment, which means that there is a lifetime for things to be done. This lifetime we call "request time", it is the time between the client requests a resource (i.e hits an url of our app or post a form) and the time that the server gives the response back to the client (i.e: The server sends html, json or any other kind of response).

The problem with this is the fact that we have a time-out and the user does not want to wait for tasks to be done, I mean in example for creating image thumbnails, users have to upload a picture and then wait for the thumbnail to be created to have a response from server. Or in the case of sending an email, user fill a contact form and have to wait for the message to be sent. It can take a long time and sometimes it will fail.

The solution is to enqueue that jobs on background and then watch its results to give a response to the user, this response can be given through a websocket or ajax long pooling. (I will not cover this here)

Setting up

  • install Redis
    • In debian based linuxes you can do: sudo apt-get install redis-server
  • Install RQ (redis queue)
    • sudo pip install rq

case 1 : Sending email in background

User will fill our contact form and then click in submit, instead of sending the email we are going to enqueue the email to be sent by the redis queue.

1. In your models create your Queue object (also you need to have the mail settings)

    from import Mail
    mail = Mail()
    mail.settings.server = ""
    mail.settings.sender = ""
    mail.settings.login = "you:yourpassword"

from redis import Redis
from rq import Queue
q = Queue(connection=Redis())

The above will use the default Redis connection port to localhost, take a look at RQ docs if you need to set another redis server.

2. In your controller create the contact action which returns a form.

    def contact():
        form = SQLFORM.factory(Field("name"), Field("message"))
        if form.accepts(request):
            # enqueue the email to be sent!
                      subject="%(name)s contacted you" % form.vars,
            # do whatever you want
            response.flash = "email successfully sent!"

## case 2 : Creating an image thumbnail

User will upload a picture and you are going to create a THUMBNAIL and store the thumbnail in /static/thumbs folder

#### 1. Define some models

Picture = db.define_table("pictures",
    Field("picture", "upload")

    from redis import Redis
    from rq import Queue
    q = Queue(connection=Redis())

#### 2. Create the form

# requires PIL to be installed
# sudo apt-get install python-imaging
from gluon.contrib.imageutils import THUMB

def add_picture():
    form = SQLFORM(Picture, submit_button="send")
    if form.process().accepted:
        #enqueue thumbnail to be created
        q.enqueue(THUMB, form.vars.picture)

Put the workers to work

On the cases above we just enqueued tasks to be executed by the workers, now we need the worker running.

web2py environment

The worker should run under the web2py environment, because we are using web2py modules to send emails and create the thumbnail, so the RQ worker should be started with this script.

1. Create the web2py RQ worker


import sys
from rq import Queue, Connection, Worker

# Provide queue names to listen to as arguments to this script,
# similar to rqworker
with Connection():
    qs = map(Queue, sys.argv[1:]) or [Queue()]
    w = Worker(qs)

Start the above worker under web2py environment

cd /path/to/web2py
python -S yourappname -M -R /some/path/

With the above worker running the enqueued tasks will be executed and then worker will keep listening for new tasks.

You can also put the worker to run in backgroungm for this you shoud use nohup python -S yourappname -M -R /some/path/ & or even better you can put this to run under the supervidord

with the worker running you should see this console:

python web2py/ -S app -M -R /projects/ 
web2py Web Framework
Created by Massimo Di Pierro, Copyright 2007-2012
Version 2.4.1-alpha.2+timestamp.2012.
Database drivers available: SQLite(sqlite3), MySQL(pymysql), PostgreSQL(pg8000), IMAP(imaplib)
[2012-12-31 00:33] DEBUG: worker: Registering birth of worker precise64.15755
[2012-12-31 00:33] INFO: worker: RQ worker started, version 0.3.2
[2012-12-31 00:33] INFO: worker: 
[2012-12-31 00:33] INFO: worker: *** Listening on default...
[2012-12-31 00:34] INFO: worker: default: send(to='', message='blah', subject='testing') (a069b2c6-f908-4806-8534-b00c43996cf4)


RQ has some nice ways for monitoring the jobs by command-line or by its dashboard.

command line:

To see what queues exist and what workers are active, just type rqinfo:

$ rqinfo
high       |██████████████████████████ 20
low        |██████████████ 12
default    |█████████ 8
3 queues, 45 jobs total

Bricktop.19233 idle: low
Bricktop.19232 idle: high, default, low
Bricktop.18349 idle: default
3 workers, 3 queues

As you can see it is possible to start many workers.


The easiest way is probably to use the RQ dashboard, a separately distributed tool, which is a lightweight webbased monitor frontend for RQ, which looks like this:

RQ dashboard


microblog app

microblog app

Este tutorial foi criado para o evento RuPy Brasil em parceria com a ZNC Sistemas.

O download do app pode ser feito em: Download pacote w2p

O tutorial em PDF: Tutorial em PDF

Tutorial: Criando um microblog app

Agora pretendo aproveitar que o blog tem mais espaço que o PDF para detalhar um pouco mais o app de microblog e também implementar algumas funcionalidades extra.


Search form with web2py

Quick and dirty search form example

Considering models/

status_options = {"0": "pending", "1": "confirmed", "3": "canceled"}

    Field("id_buyer", "reference auth_user"),
    Field("order_date", "date"),
          represent= lambda value, row: status_options[value]

And the search function controllers/

import datetime

def index():
    # default values to keep the form when submitted
    # if you do not want defaults set all below to None

    status_default = request.vars.status
    date_initial_default = \
        datetime.datetime.strptime(request.vars.date_initial, "%Y-%m-%d") \
            if request.vars.date_inicial else None
    date_final_default = \
        datetime.datetime.strptime(request.vars.date_final, "%Y-%m-%d") \
            if request.vars.date_final else None
    obs_default = request.vars.obs

    # The search form created with .factory
    form = SQLFORM.factory(
                            IS_IN_SET(status_options, zero="-- All --")
                  Field("date_initial", "date", default=date_initial_default),
                  Field("date_final", "date", default=date_final_default),
                  Field("obs", default=obs_default),

    # The base query to fetch all orders of the current logged user
    query = db.orders.id_buyer == auth.user_id                  

    # testing if the form was accepted              
    if form.process().accepted:
        # gathering form submitted values
        status = form.vars.status
        date_initial = form.vars.date_initial
        date_final = form.vars.date_final
        obs = form.vars.obs

        # more dynamic conditions in to query
        if status:
            query &= db.orders.status == status
        if date_initial:
            query &= db.orders.order_date >= date_initial
        if date_final:
            query &= db.orders.order_date <= date_final
        if obs:
            # A simple text search with %like%
            query &="%%%s%%" % obs)            

    count = db(query).count()
    results = db(query).select(
    msg = T("%s registers" % count )
    return dict(form=form, msg=msg, results=results) 

Optionally you can create a view file in views/default/index.html

{{extend 'layout.html'}}
<hr />

the end result

Download the app:

If you need a better and complex search engine I recommend Whoosh.

App news reading (portuguese)


db = DAL("sqlite://news.sqlite")
    Field("texto", "text"),
    Field("data", "datetime")


def escrever():
    form = SQLFORM(db.noticias)
    if form.process().accepted:
    return dict(form=form)

def listar():
    noticias = db(db.noticias).select(
    return dict(noticias=noticias)

def ler_noticia():
    id_noticia = request.args(0) or redirect(URL("listar"))
    # caso não passe um id retorna para /listar
    noticia = db.noticias[id_noticia]
    return dict(noticia=noticia)


{{extend "layout.html"}}

{{for noticia in noticias:}}
    <a href="{{=URL("default", "ler_noticia",}}"> 


{{extend "layout.html"}}

    <a href="{{=URL("default", "ler_noticia",}}">


{{extend "layout.html"}}

<h1> escreva uma noticia</h1>

WEb2py 2.0