AsyncIO - O futuro do Python mudou completamente!

Tradução do artigo original escrito por Yeray Diaz para o hackernoon: AsyncIO for the working Python developer

Eu me lembro do exato momento em que eu pensei, "Uau, isso está lento, aposto que se eu pudesse paralelizar essas chamadas isso voaria!" e então, 3 dias após, eu olhei para o meu código e não pude reconhece-lo, havia se transformado em um misturado de chamadas para threading e funções da biblioteca processing.

Então encontrei o asyncio, e tudo mudou!


Se você não conhece, asyncio é o novo módulo de concorrência introduzido no Python 3.4. É projetado para usar coroutines e futures para simplificar a programação assíncrona e tornar o código tão legível quanto o código síncrono simplesmente por não haver callbacks.

Eu também me lembro que enquanto eu estava naquela busca pelo paralelismo inúmeras opções estavam disponíveis, mas uma se destacou. Era rápida, fácil de aprender e muito bem escrita: A excelente biblioteca gevent. Eu cheguei até ela lendo o encantador tutorial mão na massa: Gevent for the Working Python Developer, escrito por uma sensacional comunidade de usuários, uma ótima introdução não apenas ao gevent mas ao conceito de concorrência em geral, e você também deveria dar uma lida.

Eu gostei tanto do tutorial que decidi usa-lo como template para escrever sobre o AsyncIO.

Aviso Rápido: Isto não é um artigo sobre gevent X asyncio, O Nathan Road escreveu a respeito das diferenças e similaridades entre esses 2 se você estiver interessado.

Uma nota a respeito do código neste tutorial, você deve ter lido que no Python 3.5 uma nova sintaxe foi introduzida, especialmente para coroutines, eu estou intencionalmente não utilizando esta nova sintaxe neste texto pois desta forma acredito que fica mais fácil para assimilar as coroutinas com generators. Mas você pode encontrar versões dos exemplos usando esta nova sintaxe no github.

Eu sei que você já deve estar ansioso mas antes de mergulharmos eu gostaria de primeiramente falar rapidamente sobre alguns conceitos que talvez não lhe sejam familiares.

Threads, loops, coroutines and futures

Threads são uma ferramenta comum e a maioria dos desenvolvedores já ouviu falar ou já usou. Entretanto o asyncio usa de estruturas um pouco diferentes: event loops, coroutines e futures.

  • Um event loop gerencia e distribui a execução de diferentes tarefas. Ele mantém um registro de tarefas (coroutines) e distribui o fluxo de execução entre elas.
  • As coroutines são geradores Python (generators), que quando há a ocorrência do yield libera o controle do fluxo de volta ao event loop. Uma coroutine precisa estar programada para ser executada usando o event loop, para fazer isso criamos uma tarefa do tipo future.
  • E um future é um objeto que representa o resultado de uma tarefa que pode, ou não, ter sido executada. Este resultado pode ser uma Exception.

Entendeu? simples né? vamos mergulhar neste conceito!

Execução síncrona e Execução assíncrona

Em Concorrência não é paralelismo, é melhor! o Rob Pike falou uma coisa que fez um click na minha cabeça: Dividir tarefas em sub-tarefas concorrentes já é o suficiente para permitir o paralelismo. Mas é o fato de programar/agendar a execução dessas sub-tarefas que realmente cria o paralelismo.

O ASyncIO faz exatamente isso, você pode estruturar o seu código em sub-tarefas definidas como coroutines e isso te permite programar a execução da maneira que desejar, incluindo a forma simultânea. As Corountines contém pontos de vazão demarcados com a palavra yield onde definimos onde uma troca de contexto poderia ocorrer caso existam outras tarefas pendentes, mas que não irá ocorrer caso não existam outras tarefas.

Nota do tradutor: Em uma loja de doces há um funcionário empacotando balas, ao finalizar cada pacote ele o lacra e coloca na vitrine (YIELD), então ele dá uma olhada no balcão para ver se tem algum cliente para ser atendido, se tiver um cliente, então ele para de empacotar balas atende o pedido do cliente (troca de contexto). E só depois de terminar de > atender o cliente ele então volta a empacotar as balas, caso não tenha cliente a ser atendido ele simplesmente continua o trabalho de empacotamento. Podemos dizer que é um funcionário fazendo duas tarefas __. (responda nos comentários se é paralelamente ou concorrentemente)

Uma troca de contexto no asyncio representa o event loop passando o fluxo
de controle da coroutine em execução para a próxima na fila de execução, ou seja, (Yielding).

Veja um exemplo básico:

import asyncio

@asyncio.coroutine
def empacotar_bala():
    print("Empacotando balas...")

    # parada para verificar se tem cliente no balcão
    yield from asyncio.sleep(0)

    # troca de contexto
    print("Explicitamente voltando a empacotar balas")


@asyncio.coroutine
def atender_balcao():
    print("Explicitamente verificando se tem cliente no balcão...")

    yield from asyncio.sleep(0)

    print("Voltando a empacotar as balas")


ioloop = asyncio.get_event_loop()  # Event Loop

tasks = [ioloop.create_task(empacotar_bala()),
         ioloop.create_task(atender_balcao())]

wait_tasks = asyncio.wait(tasks)

ioloop.run_until_complete(wait_tasks)

ioloop.close()

Execute:

$ python3 async1.py
Empacotando balas...
Explicitamente verificando se tem cliente no balcão...
Explicitamente voltando a empacotar balas
Voltando a empacotar as balas
  • Primeiramente nós declaramos duas tarefas simples com a intenção de serem executadas de maneira não bloqueante pois usamos a função sleep do asyncio.
  • Coroutines só podem ser chamadas por outras coroutines ou podem ser agrupadas em uma task para então serem enfileiradas, nós usamos a função create_task para fazer isso.
  • Então criamos lista contendo as 2 tasks e nós a combinamos em uma wait que é uma task que irá aguardar até que todas as tarefas
    enfileiradas terminem.
  • E finalmente nós programamos a wait para executar usando o event loop usando a função run_until_complete.

Ao usar yield from na coroutine empacotar_bala nós declaramos que a coroutine pode naquele momento passar o controle do fluxo de execução de volta para o event loop, neste caso o sleep ao terminar (note o sleep(0)) irá devolver o controle ao event loop que irá mudar de contexto, passando o controle de fluxo para a próxima coroutine agendada para execução: atender_balcao

Nota: O tradutor alterou os nomes das funções dos exemplos do artigo original para dar um significado mais fácil de ser interpretado em português mas mantendo a semântica e fluxo de execução dos códigos, todavia os originais estão no github.

Vamos agora simular duas tarefas bloqueantes gr1 e gr2, considere que há dois requests para serviços externos. Enquanto elas executam, uma terceira tarefa pode ser executada assíncronamente, como no seguinte exemplo:

import time
import asyncio

start = time.time()

def tic():
  return 'at %1.1f segundos' % (time.time() - start)


@asyncio.coroutine
def gr1():
  # Demora a ser executada, mas não queremos esperar
  print('gr1 iniciou a execução: {}'.format(tic()))
  yield from asyncio.sleep(2)
  print('gr1 terminou a execução: {}'.format(tic()))


@asyncio.coroutine
def gr2():
  # Demora a ser executada, mas não queremos esperar
  print('gr2 iniciou a execução: {}'.format(tic()))
  yield from asyncio.sleep(2)
  print('gr2 terminou a execução: {}'.format(tic()))


@asyncio.coroutine
def gr3():
  print('Executando enquanto as outras estão bloqueadas: {}'.format(tic()))
  yield from asyncio.sleep(5)
  print('Pronto!')

ioloop = asyncio.get_event_loop()
tasks = [
    ioloop.create_task(gr1()),
    ioloop.create_task(gr2()),
    ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()

Execute:

$ python3 async2.py 
gr1 iniciou a execução: at 0.0 segundos
gr2 iniciou a execução: at 0.0 segundos
Executando enquanto as outras estão bloqueadas: at 0.0 segundos
gr1 terminou a execução: at 2.0 segundos
gr2 terminou a execução: at 2.0 segundos
Pronto!

Perceba que na forma que o I/O loop faz o gerenciamento e programa a execução permite que o seu código, rodando em single thread possa operar de forma concorrente. Enquanto duas tarefas estavam bloqueadas uma terceira pode tomar o controle do fluxo de execução e ser executada de maneira assíncrona.

Ordem de execução

No mundo síncrono estamos acostumados a pensar de maneira linear. Se nós tivermos uma lista de tarefas que consomem diferente quantidade de tempo elas serão executadas na ordem em que foram chamadas.

Porém, quando usamos concorrência nós precisamos estar cientes de que as tarefas terminam em tempos que diferem da ordem em que foram enfileiradas.

import random
from time import sleep
import asyncio


def task(pid):
    """Uma tarefa não deterministica"""
    sleep(random.randint(0, 2) * 0.001)
    print('Task %s terminada' % pid)


@asyncio.coroutine
def task_coro(pid):
    """Uma tarefa deterministica"""
    yield from asyncio.sleep(random.randint(0, 2) * 0.001)
    print('Task %s terminada' % pid)


def synchronous():
    for i in range(1, 10):
        task(i)


@asyncio.coroutine
def asynchronous():
    tasks = [asyncio.async(task_coro(i)) for i in range(1, 10)]
    yield from asyncio.wait(tasks)


print('Síncronamente:')
synchronous()

ioloop = asyncio.get_event_loop()
print('Assíncronamente:')
ioloop.run_until_complete(asynchronous())

ioloop.close()

Execute:

$ python3 async3.py 
Síncronamente:
Task 1 terminada
Task 2 terminada
Task 3 terminada
Task 4 terminada
Task 5 terminada
Task 6 terminada
Task 7 terminada
Task 8 terminada
Task 9 terminada

Assíncronamente:
Task 2 terminada
Task 4 terminada
Task 8 terminada
Task 5 terminada
Task 6 terminada
Task 7 terminada
Task 9 terminada
Task 1 terminada
Task 3 terminada

A saida será com certeza variada, pois cada task espera por uma quantidade randômica de tempo, mas repare que a ordem dos resultados é completamente diferente, mesmo tendo enfileirado em uma lista de tarefas na mesma ordem usando o mesmo range.

Outro detalhe é que tivemos que uma versão em coroutine da nossa simples função. É importante entender que o asyncio não faz com que as coisas se transformarem magicamente em não bloqueantes.

O AsyncIO está por enquanto sozinho na biblioteca padrão do Python 3 enquanto todos outros módulos oferecem apenas funcionalidades bloqueantes.

Você pode usar o módulo concurrent.futures para agrupar tarefas bloqueantes em uma thread ou um processo e então retornar um Future que o asyncio pode utilizar. Os mesmos exemplos utilizando threads podem ser encontrados no github

Esta é provavelmente a maior desvantagem ao usar asyncio neste momento, porém existe uma série de bibliotecas para diferentes tarefas e serviços que já estão disponíveis de maneira não bloqueante.


Uma tarefa bloqueante bastante comum é coletar dados de um serviço HTTP. Para isso vou usar a excelente biblioteca aiohttp que efetua chamadas não bloqueantes a serviços HTTP. Neste exemplo vamos coletar dados da API pública do Github e ler apenas o valor de Date do responde header.

import time
import urllib.request
import asyncio
import aiohttp

URL = 'https://api.github.com/events'
MAX_CLIENTS = 3


def fetch_sync(pid):
    print('Captura síncrona {} iniciou'.format(pid))
    start = time.time()
    response = urllib.request.urlopen(URL)
    datetime = response.getheader('Date')

    print('Processo {}: {}, demorou: {:.2f} segundos'.format(
        pid, datetime, time.time() - start))

    return datetime


@asyncio.coroutine
def fetch_async(pid):
    print('Captura assíncrona {} iniciou'.format(pid))
    start = time.time()
    response = yield from aiohttp.request('GET', URL)
    datetime = response.headers.get('Date')

    print('Processo {}: {}, demorou: {:.2f} segundos'.format(
        pid, datetime, time.time() - start))

    response.close()
    return datetime


def synchronous():
    start = time.time()
    for i in range(1, MAX_CLIENTS + 1):
        fetch_sync(i)
    print("Processo demorou: {:.2f} segundos".format(time.time() - start))


@asyncio.coroutine
def asynchronous():
    start = time.time()
    tasks = [asyncio.ensure_future(
        fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)]
    yield from asyncio.wait(tasks)
    print("Processo demorou: {:.2f} segundos".format(time.time() - start))


print('Sincrono:')
synchronous()

print('Assíncrono:')
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Execute:

$ python3 -V
Python 3.4.4+

$ pip3 install aiohttp

$ python3 async4.py
Sincrono:
Processo sincrono 1 iniciou
Processo 1: Wed, 17 Feb 2016 13:10:11 GMT, demorou: 0.54 segundos
Processo sincrono 2 iniciou
Processo 2: Wed, 17 Feb 2016 13:10:11 GMT, demorou: 0.50 segundos
Processo sincrono 3 iniciou
Processo 3: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.48 segundos
Process demorou: 1.54 segundos

Assíncrono:
Processo assincrono 1 iniciou
Processo assincrono 2 iniciou
Processo assincrono 3 iniciou
Processo 3: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.50 segundos
Processo 2: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.52 segundos
Processo 1: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.54 segundos
Processo demorou: 0.54 segundos

Nota: requer Python 3.4.4+ caso contrário cairá em exception

Primeiramente, repare na diferença de tempo, usando chamadas assíncronas nós efetuamos as requisições ao serviço HTTP exatamente ao mesmo tempo (13:10:12). Como falado anteriormente, cada requisição passou (yield) o fluxo de controle para a próxima e retornou quando foi completada.

Resultando no fato de que requisitar e capturar os resultados de todos as tarefas demorou o mesmo tempo que a requisição mais lenta! Veja o tempo no log 0.54 segundos para a requisição mais lenta (processo 1) e é exatamente o mesmo tempo que se passou para processar todos os 3 requests, Legal né? (enquanto a parte de I/O daquela tarefa lenta estava bloqueada, as outras puderam ser executadas simultaneamente).

Agora veja como o código é similar ao da versão síncrona! é praticamente o mesmo código! As diferenças principais são por conta das diferenças de implementações das bibliotecas usadas e a parte da criação das tasks e a espera para elas terminarem.

Criando concorrência

Até então estamos usando uma única abordagem de criar e requisitar resultados de coroutines, criar uma lista de tasks e esperar que elas terminem.

Mas as coroutines podem ser programadas para serem executadas e requisitar resultados em maneiras diferentes. Imagine um cenário onde precisamos processar os resultados de uma chamada HTTP GET assim que ela é requisitada, o processo é na verdade similar ao que fizemos no exemplo anterior.

import time
import random
import asyncio
import aiohttp

URL = 'https://api.github.com/events'
MAX_CLIENTS = 3


@asyncio.coroutine
def fetch_async(pid):
    start = time.time()
    sleepy_time = random.randint(2, 5)
    print('Processo assincrono {} iniciou, esperando por {} segundos'.format(
        pid, sleepy_time))

    yield from asyncio.sleep(sleepy_time)

    response = yield from aiohttp.request('GET', URL)
    datetime = response.headers.get('Date')

    response.close()
    return 'Processo {}: {}, demorou: {:.2f} segundos'.format(
        pid, datetime, time.time() - start)


@asyncio.coroutine
def asynchronous():
    start = time.time()
    futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
    for i, future in enumerate(asyncio.as_completed(futures)):
        result = yield from future
        print('{} {}'.format(">>" * (i + 1), result))

    print("Processo demorou: {:.2f} segundos".format(time.time() - start))


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Execute

$ python3 async5.py

Processo assincrono 1 iniciou, esperando por 4 segundos
Processo assincrono 3 iniciou, esperando por 5 segundos
Processo assincrono 2 iniciou, esperando por 3 segundos
>> Processo 2: Wed, 17 Feb 2016 13:55:19 GMT, demorou: 3.53 segundos
>>>> Processo 1: Wed, 17 Feb 2016 13:55:20 GMT, demorou: 4.49 segundos
>>>>>> Processo 3: Wed, 17 Feb 2016 13:55:21 GMT, demorou: 5.48 segundos
Processo demorou: 5.48 segundos

Repare no deslocamento >> e no tempo de cada chamada, elas foram programadas ao mesmo tempo, os resultados chegam fora de ordem e são processados assim que chegam.

Este código é um pouco diferente, estamos agrupando as coroutines em uma lista, cada uma para ser agendada e executada. a função as_completed retorna um iterador que irá gerar (YIELD) um future completo assim que a tarefa estiver terminada. Muito legal né? aliás, as funções as_completed e wait são ambas originalmente parte do modulo concurrent.futures


Vamos pegar um outro exemplo, imagine que você está tentando consultar o seu endereço de IP público atual em seu programa. Existem alguns serviços que fornecem essa informação mas você não tem certeza se estarão acessíveis no momento da execução. Você não quer checar cada um deles sequencialmente, então, é preferível efetuar requisições concorrentes para cada um dos serviços e utilizar aquele que responder mais rapidamente, ok? Ok!

Bom, acontece que nosso velho amigo wait recebe o parâmetro return_when para fazer exatamente isso. Estávamos ignorando o dado retornado pelo wait já que estávamos preocupados apenas em paralelizar as tarefas. Mas agora nós queremos pegar os resultados da coroutine, então usaremos dois conjuntos de futures, done e pending.

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    response = yield from aiohttp.request('GET', service.url)
    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} terminou com resultado: {}, demorou: {:.2f} segundos'.format(
        service.name, ip, time.time() - start)


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = yield from asyncio.wait(
        futures, return_when=FIRST_COMPLETED)
    print(done.pop().result())


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Execute:

$ python3 async6.py
Fetching IP from ip-api
Fetching IP from ipify
ip-api terminou com resultado: 82.34.76.170, demorou: 0.09 segundos
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10f95c6d8>
Task was destroyed but it is pending!
task: <Task pending coro=<fetch_ip() running at 2c-fetch-first-ip-address-response.py:20> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(10)(), Task._wakeup()]>>

Espere, o que aconteceu aqui? O primeiro serviço respondeu com sucesso então o que são esses warnings?

Bem, nós agendamos duas tarefas mas não permitimos que as duas fossem completadas. o AsyncIO considera que é um bug e imprime um warning. Então queremos informar ao event loop que este era um comportamento esperado e não se preocupar com a segunda tarefa que ficou pendente. Como? que bom que perguntou.

Estados do Futuro

(sobre o estado que um future está atualmente, não o estado que ele estará no futuro... você entendeu né!)

Eles são:

  • Pending
  • Running
  • Done
  • Cancelled

Simples assim, quando um future finaliza sua tarefa, esta tarefa irá retornar o resultado de volta para o future, se estiver em pending ou cancelled ele gera o erro InvalidStateError ou CancelledError, e finalmente se a coroutine gera o erro ele será re-gerado, o que significa o mesmo comportamento ao chamar exception. confira aqui

Você também pode usar .done, .cancelled e .running em uma Future para obter um booleano indicando o estado. Note que done significa simplesmente que o result irá retornar ou gerar um erro. Você pode explicitamente cancelar um Future chamando o método cancel, e isso é o que precisamos para resolver o warning anterior.



def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = yield from asyncio.wait(
        futures, return_when=FIRST_COMPLETED)
    print(done.pop().result())

    for future in pending:
        future.cancel()

e então

$ python3 async6.py
Fetching IP from ip-api
Fetching IP from ipify
ip-api terminou com resultado: 82.34.76.170, demorou: 0.09 segundos

Um bom resultado né!

Futures permitem injetar callbacks a serem executados quando entrarem no estado done caso você queira adicionar uma lógica adicional, ou se você não quiser usar o yield from e prefira o callback hell, (sério quer mesmo?)

E você pode, para objetivos de unit-testing manualmente injetar o resultado ou uma exception a um Future.

Gerenciamento de erros

O AsyncIO é sobre fazer código concorrente gerenciável e legível, e isto se torna óbio no que diz respeito ao tratamento de erros. Vamos voltar ao exemplo anterior para ilustrar isso.

Imagine que queremos garantir que os 2 serviços retornaram o mesmo resultado, mas um dos serviços fica offline e não responde, podemos usar apenas o usual try... except

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
    Service('broken', 'http://este-servico-nao-funciona', 'ip')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        return "{} não está respondendo".format(service.name)

    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} terminou com resultado: {}, demorou: {:.2f} segundos'.format(
        service.name, ip, time.time() - start)


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, _ = yield from asyncio.wait(futures)

    for future in done:
        print(future.result())


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

execute:

$ python3 async7.py
Fetching IP from ip-api
Fetching IP from borken
Fetching IP from ipify
ip-api terminou com o resultado: 85.133.69.250, demorou: 0.75 segundos
ipify terminou com o resultado: 85.133.69.250, demorou: 1.37 segundos
borken não está respondendo

Também podemos tratar os erros enquanto processamos os resultados dos Futures em caso de algo não esperado acontecer (lembra que eu disse que os erros são re-gerados na coroutine).

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp
import traceback

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
    Service('broken', 'http://este-servico-nao-funciona', 'ip')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        return "{} não está respondendo".format(service.name)

    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} terminou com resultado: {}, demorou: {:.2f} segundos'.format(
        service.name, ip, time.time() - start)


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, _ = yield from asyncio.wait(futures)

    for future in done:
        try:
            print(future.result())
        except:
            print("Erro não esperado: {}".format(traceback.format_exc()))


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Veja:

$ python3 async7.py
Fetching IP from ipify
Fetching IP from borken
Fetching IP from ip-api
ipify terminou com o resultado: 85.133.69.250, demorou: 0.91 segundos
borken não está respondendo
Erro não esperado: Traceback (most recent call last):
 File “3b-fetch-ip-addresses-future-exceptions.py”, line 41, in asynchronous
 print(future.result())
 File “/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py”, line 274, in result
 raise self._exception
 File “/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py”, line 239, in _step
 result = coro.send(value)
 File “3b-fetch-ip-addresses-future-exceptions.py”, line 27, in fetch_ip
 ip = json_response[service.ip_attr]
KeyError: ‘this-is-not-an-attr’

Da mesma forma que agendar uma task e não esperar que ela termine é considerado um bug, agendar uma task e não recuperar possíveis erros também irá gerar um warning.

from collections import namedtuple
import time
import asyncio
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'),
    Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        print('{} is unresponsive'.format(service.name))
    else:
        json_response = yield from response.json()
        ip = json_response[service.ip_attr]

        response.close()
        print('{} finished with result: {}, took: {:.2f} seconds'.format(
            service.name, ip, time.time() - start))


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    yield from asyncio.wait(futures)  # intentionally ignore results


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

execute:

$ python3 async8.py
Fetching IP from ipify
Fetching IP from borken
Fetching IP from ip-api
borken is unresponsive
ipify finished with result: 85.133.69.250, took: 0.78 seconds
Task exception was never retrieved
future: <Task finished coro=<fetch_ip() done, defined at 3c-fetch-ip-addresses-ignore-exceptions.py:15> exception=KeyError(‘this-is-not-an-attr’,)>
Traceback (most recent call last):
 File “/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py”, line 239, in _step
 result = coro.send(value)
 File “3c-fetch-ip-addresses-ignore-exceptions.py”, line 26, in fetch_ip
 ip = json_response[service.ip_attr]
KeyError: ‘this-is-not-an-attr’

Se parece muito com a saída do exemplo anterior, mas não contém os mínimos detalhes e mensagens do asyncio.

Timeouts

E se nós não nos importarmos muito com o nosso IP? Imagine que é apenas um adicional ao nosso serviço, mas não importante, não queremos que o usuário fique esperando por este dado. Idealmente nós definimos um time-out para nossas tarefas não bloqueantes, e então continuamos nosso programa sem o atributo do IP já que neste exemplo não é tão importante.

De novo descobrimos que wait tem o atributo que precisamos:

import time
import random
import asyncio
import aiohttp
import argparse
from collections import namedtuple
from concurrent.futures import FIRST_COMPLETED

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
)

DEFAULT_TIMEOUT = 0.01


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    yield from asyncio.sleep(random.randint(1, 3) * 0.1)
    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        return '{} is unresponsive'.format(service.name)

    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    print('{} finished with result: {}, took: {:.2f} seconds'.format(
        service.name, ip, time.time() - start))
    return ip


@asyncio.coroutine
def asynchronous(timeout):
    response = {
        "message": "Result from asynchronous.",
        "ip": "not available"
    }

    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = yield from asyncio.wait(
        futures, timeout=timeout, return_when=FIRST_COMPLETED)

    for future in pending:
        future.cancel()

    for future in done:
        response["ip"] = future.result()

    print(response)


parser = argparse.ArgumentParser()
parser.add_argument(
    '-t', '--timeout',
    help='Timeout to use, defaults to {}'.format(DEFAULT_TIMEOUT),
    default=DEFAULT_TIMEOUT, type=float)
args = parser.parse_args()

print("Using a {} timeout".format(args.timeout))
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous(args.timeout))
ioloop.close()

Repare no argumento timeout, também adicionamos um parâmetro de linha de comando para testar mais facilmente o que acontecerá se deixarmos a requisição ocorrer algumas vezes. Também adicionei um tempo randomico de slepp só para garantir que as coisas não aconteçam tão rápido que a gente nem perceba.

$ python async8.py

Using a 0.01 timeout
Fetching IP from ipify
Fetching IP from ip-api
{‘message’: ‘Result from asynchronous.’, ‘ip’: ‘not available’}

$ python async8.py -t 5
Using a 5.0 timeout
Fetching IP from ip-api
Fetching IP from ipify
ipify finished with result: 82.34.76.170, took: 1.24 seconds
{'ip': '82.34.76.170', 'message': 'Result from asynchronous.'}

Conclusão

Asyncio aumentou meu já enorme amor por Python. Para ser absolutamente honesto eu me apaixonei pelas coroutines em Python quando conheci o Tornado mas o asyncio conseguiu unir o melhor desta abordagem junto com excelentes bibliotecas de concorrência. E muito foi feito para que outras bibliotecas possam usar o IO loop, então se você está usando o Tornado, você também pode usa-lo com bibliotecas feitas para o asyncio!

E como eu disse anteriormente o maior problema por enquanto é a falta de bibliotecas e módulos que implementam o comportamento não bloqueante. Você pode achar que uma vasta quantidade de tecnologias já estabelecidas ainda não tenham uma versão não bloqueante para interagir com asyncio, ou que as existentes ainda estão jovens ou experimentais. Porém, O numero de bibliotexas está crescendo diariamente

confira este repo: https://github.com/aio-libs

Espero neste tutorial ter passado a idéia de como é prazeroso trabalhar com AsyncIO. Eu honestamente penso que isto é a peça que finalmente nos levará a adoção em massa e adaptação ao Python 3, essas são as coisas que você está perdendo se ficar parado no Python 2.7.

Uma coisa é certa, o Futuro do Python mudou completamente! (trocadilho intencional)

Castálio Podcast Especial Python Brasil Parte 3

Python Brasil parte 3

Fechando a série sobre a Python Brasil 12, neste episódio Eu, Og e Elyézer falamos sobre assuntos abordados em algumas palestras da conferência, comunidade, cervejas e Oktoberfest!

Como ouvir?

Acesse a seguinte URL http://castalio.info/episodio-75-python-brasil-12-parte-3.html para ouvir online e você também pode baixar os arquivos em MP3 ou Ogg.

Acompanhe!

Participe!

Se você tem sugestões, dicas de pessoas para serem entrevistadas, pautas a serem abordadas por favor entre em contato em um dos canais acima ou deixe comentários nos episódios.

Castálio Podcast Especial Python Brasil

Castálio?

Castália é o nome de uma náiade (uma ninfa aquática) que foi transformada por Apolo em nascente de água, perto de Delfos (a Fonte de Castália) e na base do Monte Parnaso.

Castália inspirava o génio poético daqueles que bebessem das suas águas ou ouvissem o movimento das suas águas. A água sagrada também era usada para as limpezas dos templos em Delfos.

Com o objetivo de entrevistar e ao mesmo tempo apresentar pessoas e projetos de tecnologia que sejam fonte de inspiração para os ouvintes, este podcast traz peridicamente uma nova vítima, err figura da comunidade de tecnologia que será sabatinada de todos os ângulos para o seu deleite!

Castálio na Python Brasil 12

Durante a Python Brasil 12 em Florianópolis eu e o Elyézer gravamos algumas entrevistas para o Castálio Podcast e com isso surgiu o convite do Og Maciel para eu integrar a equipe do Castálio, publicaremos 3 episódios especiais sore a Python Brasil 12, dois episódios com entrevistas e um terceiro com detalhes sobre nossa participação e o que fizemos por lá.


Lightning Cast

A idéia surgiu de ultima hora, em uma conversa de IRC durante o trabalho pensamos que seria interessante gravar pequenas entrevistas e ai surgiu o conceito de Lightning Cast! Paramos algumas pessoas no corredor da PyBR e convidamos para falar durante +-5 minutos.

Como foi a primeira vez que fizemos e bastante improvisado conseguimos fazer apenas 4 entrevistas, mas já nos serviu para validar a idéia e agora este novo formato fará parte do Castálio sempre que estivermos em eventos e conferências.

Entrevistas

Entrevistamos na primeira parte o Mário Sérgio que foi o Big Kahuna (organizador) da conferência e também a Naomi Cedar que foi Keynote do evento e faz parte da Python Software Foundation além de ser porta voz da questão de diversidade e inclusão na comunidade Python (esta segunda entrevista foi feita em Inglês).

Na segunda parte falamos com o pessoal do Projeto Serenata de Amor e depois com o Turicas mas não vou dar spoiler, vocês terão que ouvir o episódio na próxima semana para saber mais sobre a conversa.

Na terceira parte que será publicada daqui 2 semanas, Eu, Elyézer e o Og iremos bater um papo sobre a Python Brasil, Palestras que assistimos, pessoas que conhecemos e cervejas que tomamos, além é claro que falar sobre Floripa e a OktoberFest que foi o evento de fechamento da conferência.

Como ouvir?

Acesse a seguinte URL http://castalio.info/episodio-73-python-brasil-12-parte-1.html para ouvir online e você também pode baixar os arquivos em MP3 ou Ogg.

Acompanhe!

Participe!

Se você tem sugestões, dicas de pessoas para serem entrevistadas, pautas a serem abordadas por favor entre em contato em um dos canais acima ou deixe comentários nos episódios.

Castálio Podcast Especial Python Brasil Parte 2

Entrevistas na Python Brasil parte 2

Na segunda parte falamos com o pessoal do Projeto Serenata de Amor e depois com o Turicas mas não vou dar spoiler, vocês terão que ouvir o episódio para saber mais sobre a conversa.

E na semana que vem tem a terceira parte!

Como ouvir?

Acesse a seguinte URL http://castalio.info/episodio-74-python-brasil-12-parte-2.html para ouvir online e você também pode baixar os arquivos em MP3 ou Ogg.

Acompanhe!

Participe!

Se você tem sugestões, dicas de pessoas para serem entrevistadas, pautas a serem abordadas por favor entre em contato em um dos canais acima ou deixe comentários nos episódios.

Microservices with Python, RabbitMQ and Nameko

"Micro-services is the new black" - Splitting the project in to independently scalable services is the currently the best option to ensure the evolution of the code. In Python there is a Framework called "Nameko" which makes it very easy and powerful.

Micro services

The term "Microservice Architecture" has sprung up over the last few years to describe a particular way of designing software applications as suites of independently deployable services. - M. Fowler

I recommend reading the Fowler's posts to understand the theory behind it.

Ok I so what does it mean?

In brief a Micro Service Architecture exists when your system is divided in small (single context bound) responsibilities blocks, those blocks doesn't know each other, they only have a common point of communication, generally a message queue, and does know the communication protocol and interfaces.

Give me a real-life example

The code is available on github: http://github.com/rochacbruno/nameko-example take a look at service and api folders for more info.

Consider you have an REST API, that API has an endpoint receiving some data and you need to perform some kind of computation with that data, instead of blocking the caller you can do it asynchronously, return an status "OK - Your request will be processed" to the caller and do it in a background task.

Also you want to send an email notification when the computation is finished without blocking the main computing process, so it is better to delegate the "email sending" to another service.

Scenario

enter image description here

Show me the code!

Lets create the system to understand it in practice.

Environment

We need an environment with:

  • A running RabbitMQ
  • Python VirtualEnv for services
  • Python VirtualEnv for API

Rabbit

The easiest way to have a RabbitMQ in development environment is running its official docker container, considering you have Docker installed run:

docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Go to the browser and access http://localhost:15672 using credentials guest:guest if you can login to RabbitMQ dashboard it means you have it running locally for development.

enter image description here

The Service environment

Now lets create the Micro Services to consume our tasks. We'll have a service for computing and another for mail, follow the steps.

In a shell create the root project directory

$ mkdir myproject
$ cd myproject

Create and activate a virtualenv (you can also use virtualenv-wrapper)

$ virtualenv service_env
$ source service_env/bin/activate

Install nameko framework and yagmail

(service_env)$ pip install nameko
(service_env)$ pip install yagmail

The service code

Now having that virtualenv prepared (consider you can run service in a server and API in another) lets code the nameko RPC Services.

We are going to put both services in a single python module, but you can also split in separate modules and also run them in separate servers if needed.

In a file called service.py

import yagmail
from nameko.rpc import rpc, RpcProxy


class Mail(object):
    name = "mail"

    @rpc
    def send(self, to, subject, contents):
        yag = yagmail.SMTP('myname@gmail.com', 'mypassword')
        # read the above credentials from a safe place.
        # Tip: take a look at Dynaconf setting module
        yag.send(to=to.encode('utf-8'), 
                 subject=subject.encode('utf-8'), 
                 contents=[contents.encode('utf-8')])


class Compute(object):
    name = "compute"
    mail = RpcProxy('mail')    

    @rpc
    def compute(self, operation, value, other, email):
        operations = {'sum': lambda x, y: int(x) + int(y),
                      'mul': lambda x, y: int(x) * int(y),
                      'div': lambda x, y: int(x) / int(y),
                      'sub': lambda x, y: int(x) - int(y)}
        try:
            result = operations[operation](value, other)
        except Exception as e:
            self.mail.send.async(email, "An error occurred", str(e))
            raise
        else:
            self.mail.send.async(
                email, 
                "Your operation is complete!", 
                "The result is: %s" % result
            )
            return result

Now with the above services definition we need to run it as a Nameko RPC service.

NOTE: We are going to run it in a console and leave it running, but in production it is recommended to put the service to run using supervisord or an alternative.

Run the service and let it running in a shell

(service_env)$ nameko run service --broker amqp://guest:guest@localhost
starting services: mail, compute
Connected to amqp://guest:**@127.0.0.1:5672//
Connected to amqp://guest:**@127.0.0.1:5672//

Testing it

Go to another shell (with the same virtenv) and test it using nameko shell

(service_env)$ nameko shell --broker amqp://guest:guest@localhost
Nameko Python 2.7.9 (default, Apr  2 2015, 15:33:21) 
[GCC 4.9.2] shell on linux2
Broker: amqp://guest:guest@localhost
>>>

You are now in the RPC client testing shell exposing the n.rpc object, play with it

>>> n.rpc.mail.send("name@email.com", "testing", "Just testing")

The above should sent an email and we can also call compute service to test it, note that it also spawns an async mail sending with result.

>>> n.rpc.compute.compute('sum', 30, 10, "name@email.com")
40
>>> n.rpc.compute.compute('sub', 30, 10, "name@email.com")
20
>>> n.rpc.compute.compute('mul', 30, 10, "name@email.com")
300
>>> n.rpc.compute.compute('div', 30, 10, "name@email.com")
3

Calling the micro-service through the API

In a different shell (or even a different server) prepare the API environment

Create and activate a virtualenv (you can also use virtualenv-wrapper)

$ virtualenv api_env
$ source api_env/bin/activate

Install Nameko, Flask and Flasgger

(api_env)$ pip install nameko
(api_env)$ pip install flask
(api_env)$ pip install flasgger

NOTE: In api you dont need the yagmail because it is service responsability

Lets say you have the following code in a file api.py

from flask import Flask, request
from flasgger import Swagger
from nameko.standalone.rpc import ClusterRpcProxy

app = Flask(__name__)
Swagger(app)
CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}


@app.route('/compute', methods=['POST'])
def compute():
    """
    Micro Service Based Compute and Mail API
    This API is made with Flask, Flasgger and Nameko
    ---
    parameters:
      - name: body
        in: body
        required: true
        schema:
          id: data
          properties:
            operation:
              type: string
              enum:
                - sum
                - mul
                - sub
                - div
            email:
              type: string
            value:
              type: integer
            other:
              type: integer
    responses:
      200:
        description: Please wait the calculation, you'll receive an email with results
    """
    operation = request.json.get('operation')
    value = request.json.get('value')
    other = request.json.get('other')
    email = request.json.get('email')
    msg = "Please wait the calculation, you'll receive an email with results"
    subject = "API Notification"
    with ClusterRpcProxy(CONFIG) as rpc:
        # asynchronously spawning and email notification
        rpc.mail.send.async(email, subject, msg)
        # asynchronously spawning the compute task
        result = rpc.compute.compute.async(operation, value, other, email)
        return msg, 200

app.run(debug=True)

Put the above API to run in a different shell or server

(api_env) $ python api.py 
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

and then access the url http://localhost:5000/apidocs/index.html you will see the Flasgger UI and you can interact with the api and start producing tasks on queue to the service to consume.

[image]

NOTE: You can see the shell where service is running for logging, prints and error messages. You can also access the RabbitMQ dashboard to see if there is some message in process there.

There is a lot of more advanced things you can do with Nameko framework you can find more information on https://nameko.readthedocs.org/en/stable/

Let's Micro Serve!

ESEngine - Elasticsearch Object Doctype Mapper for Python

What is ESEngine

esengine - The Elasticsearch Object Doctype Mapper

PyPI versions downloads Travis CI Coverage Status Code Health



ESEngine is an ODM (Object Doctype Mapper) heavily inspired by MongoEngine, developed with the idea that you have to "Know well your Elastic queries and then write them as Python objects"

You extend the esengine.Document class defining a bunch of fields and meta-attributes and you can use that model to instantiate documents and perform queries on ElasticSearch.

ESEngine is MIT licensed and is open source available at http://github.com/catholabs/esengine

The documentation is currently only a README full of examples in http://catholabs.github.io/esengine/ and also the DocString that can be read using Epydoc in http://catholabs.github.io/esengine/docs/

How it works?

Firstly you need an Elasticsearch Python Client, we recommend using the official one pip install elasticsearch, and then you can define your models using ESEngine objects.

# myproject/models.py
from elasticsearch import Elasticsearch
from esengine import Document, StringField, BooleanField

class Person(Document):
    # meta attributes
    _index = 'myproject'
    _doctype = 'person'

    # default client instance
    _es = Elasticsearch()  # optional, can be passed lazily or can be a callable 

    # field definitions
    name = StringField()
    active = BooleanField()

Person.init()

NOTE: The init() calling will initialize the index/doctype mappings and settings, this part can be omitted and then Elastic Search will try to create this by introspection when the first document is indexed.

With the model definition in a file like myproject/models.py we can now use the model class Person to Index(insert), edit, delete and of course search documents.

In a Python console:

>>> from myproject.models import Person

Indexing:

>>> user = Person(name=”Bruno”, active=True)
>>> user.save()
# or simply
>>> user =  Person.create(name=”Bruno”, active=True)

Updating

>>> user.active = False
>>> user.save()
# or simply
>>> user.update(active=False)

Filtering multiple documents

>>> users = Person.filter(active=True)
[ ResultSet generator… a list of active users ]

Bulk update

>>> users.update(active=False)

Performing raw queries (recommended)

>>> query = {“query”: {“match_all”: {}}, “sort”: “name”} 
>>> Person.search(query=query, size=10)

Querying using Payload helpers (better to create dynamic queries)

>>> from esengine import Payload, Query
>>> query = Query.match_all() 
>>> Payload(model=Person, query=query, sort=”name”).search(size=10)

Deleting documents

>>> user = Person.get(id=123)
>>> user.delete()
# or simply
>>> Person.delete_by_id(123)
# or in bulk
>>> users = Person.filter(active=False)
>>> Person.delete_all(users)
# ou simply
>>> Person.delete_by_query({“query”: …. })

You can find more examples in https://github.com/catholabs/esengine

Currently ESEngine is being used in 3 of Catholabs production projects and is reaching a nice level of performance and abstraction.

If you use ElasticSearch with Python or want to learn more about it you can follow the readme on github feel free to open issues or collaborating by Pull Requests Pull Requests :)

Let's Search!

Flasgger - API playground with Flask and Swagger UI

What is Swagger?

Swagger is a simple yet powerful representation of your RESTful API. With the largest ecosystem of API tooling on the planet, thousands of developers are supporting Swagger in almost every modern programming language and deployment environment. With a Swagger-enabled API, you get interactive documentation, client SDK generation and discoverability.

What is Swagger UI?

Swagger UI is a dependency-free collection of HTML, Javascript, and CSS assets that dynamically generate beautiful documentation and sandbox from a Swagger-compliant API. Because Swagger UI has no dependencies, you can host it in any server environment, or on your local machine. Head over to the online demo to see what it looks like for any publically accessible Swagger definition.

What is Flask? (duhhhh!!)

Flask is a microframework for Python based on Werkzeug, Jinja 2 and good intentions. Why it is awesome? because it is simple yet powerful, talking is cheaper look at the code!

from flask import Flask, jsonify, request
app = Flask(__name__)

@app.route('my_awesome_api', methods=['POST'])
def my_awesome_endpoint():
    data = request.json
    return jsonify(data=data, meta={"status": "ok"})

app.run()

Run the above script and then you can start posting to the API

curl -XPOST http://localhost:5000/my_awesome_api -d '{"python": "is awesome"}'
{
    "data": {"python": "is awesome"},
    "meta": {"status": "ok"}
}

What is Flasgger?

Flasgger is a Flask extension to help the creation of Flask APIs with documentation and live playground powered by SwaggerUI. You can define your API structure using YAML files and Flasgger creates all the specifications for you and you can use the same schema to validate the data.

GITHUB REPO: https://github.com/rochacbruno/flasgger

Install it

pip install flasgger

Create your app

You can put API specs directly in docstrings

import random
from flask import Flask, jsonify, request
from flasgger import Swagger

app = Flask(__name__)
Swagger(app)

@app.route('/api/<string:language>/', methods=['GET'])
def index(language):
    """
    This is the language awesomeness API
    Call this api passing a language name and get back its features
    ---
    tags:
      - Awesomeness Language API
    parameters:
      - name: language
        in: path
        type: string
        required: true
        description: The language name
      - name: size
        in: query
        type: integer
        description: size of awesomeness
    responses:
      500:
        description: Error The language is not awesome!
      200:
        description: A language with its awesomeness
        schema:
          id: awesome
          properties:
            language:
              type: string
              description: The language name
              default: Lua
            features:
              type: array
              description: The awesomeness list
              items:
                type: string
              default: ["perfect", "simple", "lovely"]

    """

    language = language.lower().strip()
    features = [
        "awesome", "great", "dynamic", 
        "simple", "powerful", "amazing", 
        "perfect", "beauty", "lovely"
    ]
    size = int(request.args.get('size', 1))
    if language in ['php', 'vb', 'visualbasic', 'actionscript']:
        return "An error occurred, invalid language for awesomeness", 500
    return jsonify(
        language=language,
        features=random.sample(features, size)
    )


app.run(debug=True)

Try it!

Now run your app and access http://localhost:5000/apidocs/index.html and you will play with Swagger UI!

Screenshot Flasgger

NOTE: All the default urls can be changed in configuration.

It is also possible to use a separate file for specs

Create your api specification in a separated YML file

In a file index.yml put the specs definitions

    This is the language awesomeness API
    Call this api passing a language name and get back its features
    ---
    tags:
      - Awesomeness Language API
    parameters:
      - name: language
        in: path
        type: string
        required: true
        description: The language name
      - name: size
        in: query
        type: integer
        description: size of awesomeness
    responses:
      500:
        description: Error The language is not awesome!
      200:
        description: A language with its awesomeness
        schema:
          id: awesome
          properties:
            language:
              type: string
              description: The language name
              default: Lua
            features:
              type: array
              description: The awesomeness list
              items:
                type: string
              default: ["perfect", "simple", "lovely"]

and then change the code to read from it using swag_from decorator

import random
from flask import Flask, jsonify, request
from flasgger import Swagger
from flasgger.utils import swag_from

app = Flask(__name__)
Swagger(app)

@app.route('/api/<string:language>/', methods=['GET'])
@swag_from('index.yml')
def index(language):
    language = language.lower().strip()
    features = [
        "awesome", "great", "dynamic", 
        "simple", "powerful", "amazing", 
        "perfect", "beauty", "lovely"
    ]
    size = int(request.args.get('size', 1))
    if language in ['php', 'vb', 'visualbasic', 'actionscript']:
        return "An error occurred, invalid language for awesomeness", 500
    return jsonify(
        language=language,
        features=random.sample(features, size)
    )


app.run(debug=True)

validation

If you put the specs in a separate file it is also possible to use the same specs to validate the input

from flasgger.utils import swag_from, validate, ValidationError

@app.route('/api/<string:language>/', methods=['GET'])
@swag_from('index.yml')
def index(language):
    ...
    try:
        validate(data, 'awesome', 'index.yml', root=__file__)
    except ValidationError as e:
        return "Validation Error: %s" % e, 400
    ...

More information

You can find more information and some examples in the github repository

Contribute

Please share your thoughts about ir, open issues, give ideas and PullRequests are always welcome!

Let's Swag!

Dynaconf - Let your settings to be Dynamic

Dynaconf

dynaconf - The dynamic configurator for your Python Project

MIT License PyPI downloads Travis CI Coverage Status Code Health

dynaconf is an OSM (Object Settings Mapper) it can read settings variables from a set of different data stores such as python settings files, environment variables, redis, memcached, ini files, json files, yaml files and you can customize dynaconf loaders to read from wherever you want. (maybe you really want to read from xml files ughh?)

GITHUB REPO: https://github.com/rochacbruno/dynaconf



What is Dynaconf?

Dynaconf is a common point of access to settings variables, you import only one object in your project and from that object you can access settings variables from Python settings file, from environment variables, from parsed yaml, ini, json or xml files, from datastores as Redis and MongoDB or from wherever your need if you write a simple dynaconf loader.

How it works

Install it

pip install dynaconf

Use it

from dynaconf import settings
print settings.SOME_VARIABLE
or
print settings.get('SOME_VARIABLE')

By default Dynaconf will try to use a file called settings.py on the root of your project, if you place that file there all upper case variables will be read

You can also replace the file exporting an environment variable pointing to the module or location for the settings file.

# using module name
export DYNACONF_SETTINGS=myproject.production_settings
# or using location path
export DYNACONF_SETTINGS=/etc/myprogram/settings.py

Doing that when you use from dynaconf import settings the variables will be read from that file.

So how it is Dynamic?

Now think you have your program done and you want to deploy to a certain infrastructure for testing or maybe different deployment, you don't need to rewrite the settings file. Just export some variables to your environment.

export DYNACONF_MYSQL_HOST=myserver.com

Now in your project you can do:

from dynaconf import settings
print settings.MYSQL_HOST
myserver.com

The default prefix for exported envvars is by default DYNACONF_ but you also can change it if needed.

But what if I have some typed values to export?

You can also define type casting when exporting and those types will be used to parse the values.

export DYNACONF_NUMBER='@int 123'
export DYNACONF_FLOAT='@float 12.2'
export DYNACONF_FLAG='@bool yes'
export DYNACONF_FLAG2='@bool disabled'
export DYNACONF_LIST='@json [1, 2, 3, 4]'
export DYNACONF_DICT='@json {"name": "Bruno"}'

Now you can read all those values from your project and it will be loaded with correct type casting.

from dynaconf import settings

type(settings.NUMBER)
int

type(settings.FLOAT)
float

type(settings.FLAG)
bool

print settings.FLAG2 == False
True

print settings.LIST[1]
2

print settings.DICT['name']
Bruno

Nice! But I don't want to use envvars because I use autoscaling and I want my machines to share a settings environment how to do it?

Redis

Go to your settings file (default settings.py) and put

# connection
REDIS_FOR_DYNACONF = {
    'host': 'localhost',
    'port': 6379,
    'db': 0
}

# and loader
LOADERS_FOR_DYNACONF = [
    'dynaconf.loaders.env_loader',
    'dynaconf.loaders.redis_loader'
]

Now you can store settings variables directly in Redis using a hash named by default DYNACONF_DYNACONF

If you don't want want to write directly you can use the Redis writer helper in a python REPL. (ipython as example)

from dynaconf.utils import redis_writer
from dynaconf import settings

redis_writer.write(settings, name='Bruno', mysql_host='localhost', MYSQL_PORT=1234)

And the above will be store in Redis as a hash int the form.

DYNACONF_DYNACONF:
    NAME='Bruno'
    MYSQL_HOST='localhost'
    PORT='@int 1234'

And of course you can now read those variables in the project, all the casting wildcards also works on Redis but if you want to skip type casting, write as string intead of PORT=1234 use PORT='1234' as redis stores everything as string anyway.

There is more

Dynaconf has support for using different namespaces in the same project, you can also write your own loaders, you can find more information on the repository https://github.com/rochacbruno/dynaconf

Contribute

All contributions are very welcome!!

Acknowledgements

Dynaconf was inspired by Flask app config and also by Django settings module.

Dealing with linked containers dependency in docker-compose

In docker-compose a common problem is starting services and daemons in containers that depends on services running on linked containers, in example: your app depends on elasticsearch but it is not ready when the app container is started. Solution is to use a wait script.

I knew that docker-compose team is working on adding a WAIT parameter, but while it is not ready we can use a wait script to load our services.

The process is simple, your container will execute a shell script that will try to execute a HEALTH CHECK many times you want before starting the main command. The health check, the command, the sleep and how many loops to try will be defined in environment variables

This is a program that needs to access an elasticsearch server located in 'elastic' host (that will be mapped by compose

access_elastic_search.py

from datetime import datetime 
from elasticsearch import Elasticsearch
es = Elasticsearch('elastic')
es.index(index="my-index", doc_type="test-type", id=42, body={"any": "data", "timestamp": datetime.now()})
print es.get(index="my-index", doc_type="test-type", id=42)

docker-compose.yml

elastic:
    image: elasticsearch
    command: elasticsearch -Des.node.name="YourNodeName"
    ports:
       - "9200:9200"

app:
    image: ubuntu:14.04
    command: wait_to_start
    working_dir: /src
    links:
        - elastic:elastic
    volumes:
        - .:/src
    environment:
        - WAIT_COMMAND=[ $(curl --write-out %{http_code} --silent --output /dev/null http://elastic:9200/_cat/health?h=st) = 200 ]
        - WAIT_START_CMD=python access_elastic_search.py
        - WAIT_SLEEP=2
        - WAIT_LOOPS=10

wait_to_start script

 #!/bin/bash

echo $WAIT_COMMAND
echo $WAIT_START_CMD

is_ready() {
    eval "$WAIT_COMMAND"
}

# wait until is ready
i=0
while ! is_ready; do
    i=`expr $i + 1`
    if [ $i -ge $WAIT_LOOPS ]; then
        echo "$(date) - still not ready, giving up"
        exit 1
    fi
    echo "$(date) - waiting to be ready"
    sleep $WAIT_SLEEP
done

#start the script
exec $WAIT_START_CMD

Now when you start your environment with docker-compose up the app container will start and execute wait_to_start script, which will perform the test defined in WAIT_COMMAND environment variable and will retry until the test succeeds, so it will end executing the program defined in WAIT_START_CMD

Full code on gist: https://gist.github.com/rochacbruno/bdcad83367593fd52005

This code was made with StackOverflow help

Oportunidade para Programador Python / Data Scientist na Catho

Você gosta de trabalhar com inteligência aplicada?
Não tem medo de aprender novas tecnologias?
Venha trabalhar conosco!
 
A equipe de Inovação da Catho procura por alguém com paixão por conhecimento e espírito inovador.
Nosso foco é produzir novas tecnologias que irão ajudar pessoas a encontrarem as melhores vagas
e empresas a encontrarem os melhores profissionais.
 
Temos dois perfis de vaga:
 

Perfil cientifico, para efetuar analise e desenvolvimento de modelos

e ferramentas voltadas para mineracao de dados e inteligencia artifical;

 
Áreas de conhecimento importantes no nosso dia-a-dia:
  • Machine Learning
  • Sistemas de Recomendação
  • Modelos estatísticos
  • Recuperação de Informação
 

Perfil desenvolvedor, para desenvolver aplicaçoes baseadas em data mining e big data;

 
Desenvolvimento de software:
  • Ambiente: Linux, Git, Github
  • Programação: Python
  • Banco de dados: MongoDB, MySQL, Postgres
  • Web: Flask, Tornado, Javascript, HTML/CSS
Computaçao aplicada:
  • Máquinas de busca: Elasticsearch, Solr
  • Computação em nuvem & alto desempenho: AWS, NewRelic, Otimizaçao de performance
  • Cloud & Big Data
  • Infra: Fabric, NewRelic, MMS
  • Cloud: Amazon
  • Big data: Amazon SWF
 
Requisitos:
 
  • Ser fluente em uma linguagem de programação
  • Ter autonomia para mergulhar em problemas
  • Trabalhar bem em equipe
  • Ter espírito de pesquisador
 
Mais Informações:
 
  • Localização: Tamboré, Barueri/SP
  • Contrato: CLT Full
  • Jornada: Integral/Flexível
  • Benefícios: Plano de Saúde e Dental, VR/VA, Vale transporte,
  • Seguro de vida, Estacionamento, Convênio SESC
  • Remuneração: A combinar
 
Contato:
 
Envie seu resumo profissional para catholabs@catho.com