Published in 16:02 of 11/26/2016 by Bruno Rocha
Bruno Rocha There is more to life than increasing its speed!

Published in 16:02 of 11/26/2016

←Home

AsyncIO - O futuro do Python mudou completamente!

Tradução do artigo original escrito por Yeray Diaz para o hackernoon: AsyncIO for the working Python developer

Eu me lembro do exato momento em que eu pensei, "Uau, isso está lento, aposto que se eu pudesse paralelizar essas chamadas isso voaria!" e então, 3 dias após, eu olhei para o meu código e não pude reconhece-lo, havia se transformado em um misturado de chamadas para threading e funções da biblioteca processing.

Então encontrei o asyncio, e tudo mudou!


Se você não conhece, asyncio é o novo módulo de concorrência introduzido no Python 3.4. É projetado para usar coroutines e futures para simplificar a programação assíncrona e tornar o código tão legível quanto o código síncrono simplesmente por não haver callbacks.

Eu também me lembro que enquanto eu estava naquela busca pelo paralelismo inúmeras opções estavam disponíveis, mas uma se destacou. Era rápida, fácil de aprender e muito bem escrita: A excelente biblioteca gevent. Eu cheguei até ela lendo o encantador tutorial mão na massa: Gevent for the Working Python Developer, escrito por uma sensacional comunidade de usuários, uma ótima introdução não apenas ao gevent mas ao conceito de concorrência em geral, e você também deveria dar uma lida.

Eu gostei tanto do tutorial que decidi usa-lo como template para escrever sobre o AsyncIO.

Aviso Rápido: Isto não é um artigo sobre gevent X asyncio, O Nathan Road escreveu a respeito das diferenças e similaridades entre esses 2 se você estiver interessado.

Uma nota a respeito do código neste tutorial, você deve ter lido que no Python 3.5 uma nova sintaxe foi introduzida, especialmente para coroutines, eu estou intencionalmente não utilizando esta nova sintaxe neste texto pois desta forma acredito que fica mais fácil para assimilar as coroutinas com generators. Mas você pode encontrar versões dos exemplos usando esta nova sintaxe no github.

Eu sei que você já deve estar ansioso mas antes de mergulharmos eu gostaria de primeiramente falar rapidamente sobre alguns conceitos que talvez não lhe sejam familiares.

Threads, loops, coroutines and futures

Threads são uma ferramenta comum e a maioria dos desenvolvedores já ouviu falar ou já usou. Entretanto o asyncio usa de estruturas um pouco diferentes: event loops, coroutines e futures.

  • Um event loop gerencia e distribui a execução de diferentes tarefas. Ele mantém um registro de tarefas (coroutines) e distribui o fluxo de execução entre elas.
  • As coroutines são geradores Python (generators), que quando há a ocorrência do yield libera o controle do fluxo de volta ao event loop. Uma coroutine precisa estar programada para ser executada usando o event loop, para fazer isso criamos uma tarefa do tipo future.
  • E um future é um objeto que representa o resultado de uma tarefa que pode, ou não, ter sido executada. Este resultado pode ser uma Exception.

Entendeu? simples né? vamos mergulhar neste conceito!

Execução síncrona e Execução assíncrona

Em Concorrência não é paralelismo, é melhor! o Rob Pike falou uma coisa que fez um click na minha cabeça: Dividir tarefas em sub-tarefas concorrentes já é o suficiente para permitir o paralelismo. Mas é o fato de programar/agendar a execução dessas sub-tarefas que realmente cria o paralelismo.

O ASyncIO faz exatamente isso, você pode estruturar o seu código em sub-tarefas definidas como coroutines e isso te permite programar a execução da maneira que desejar, incluindo a forma simultânea. As Corountines contém pontos de vazão demarcados com a palavra yield onde definimos onde uma troca de contexto poderia ocorrer caso existam outras tarefas pendentes, mas que não irá ocorrer caso não existam outras tarefas.

Nota do tradutor: Em uma loja de doces há um funcionário empacotando balas, ao finalizar cada pacote ele o lacra e coloca na vitrine (YIELD), então ele dá uma olhada no balcão para ver se tem algum cliente para ser atendido, se tiver um cliente, então ele para de empacotar balas atende o pedido do cliente (troca de contexto). E só depois de terminar de > atender o cliente ele então volta a empacotar as balas, caso não tenha cliente a ser atendido ele simplesmente continua o trabalho de empacotamento. Podemos dizer que é um funcionário fazendo duas tarefas __. (responda nos comentários se é paralelamente ou concorrentemente)

Uma troca de contexto no asyncio representa o event loop passando o fluxo
de controle da coroutine em execução para a próxima na fila de execução, ou seja, (Yielding).

Veja um exemplo básico:

import asyncio

@asyncio.coroutine
def empacotar_bala():
    print("Empacotando balas...")

    # parada para verificar se tem cliente no balcão
    yield from asyncio.sleep(0)

    # troca de contexto
    print("Explicitamente voltando a empacotar balas")


@asyncio.coroutine
def atender_balcao():
    print("Explicitamente verificando se tem cliente no balcão...")

    yield from asyncio.sleep(0)

    print("Voltando a empacotar as balas")


ioloop = asyncio.get_event_loop()  # Event Loop

tasks = [ioloop.create_task(empacotar_bala()),
         ioloop.create_task(atender_balcao())]

wait_tasks = asyncio.wait(tasks)

ioloop.run_until_complete(wait_tasks)

ioloop.close()

Execute:

$ python3 async1.py
Empacotando balas...
Explicitamente verificando se tem cliente no balcão...
Explicitamente voltando a empacotar balas
Voltando a empacotar as balas
  • Primeiramente nós declaramos duas tarefas simples com a intenção de serem executadas de maneira não bloqueante pois usamos a função sleep do asyncio.
  • Coroutines só podem ser chamadas por outras coroutines ou podem ser agrupadas em uma task para então serem enfileiradas, nós usamos a função create_task para fazer isso.
  • Então criamos lista contendo as 2 tasks e nós a combinamos em uma wait que é uma task que irá aguardar até que todas as tarefas
    enfileiradas terminem.
  • E finalmente nós programamos a wait para executar usando o event loop usando a função run_until_complete.

Ao usar yield from na coroutine empacotar_bala nós declaramos que a coroutine pode naquele momento passar o controle do fluxo de execução de volta para o event loop, neste caso o sleep ao terminar (note o sleep(0)) irá devolver o controle ao event loop que irá mudar de contexto, passando o controle de fluxo para a próxima coroutine agendada para execução: atender_balcao

Nota: O tradutor alterou os nomes das funções dos exemplos do artigo original para dar um significado mais fácil de ser interpretado em português mas mantendo a semântica e fluxo de execução dos códigos, todavia os originais estão no github.

Vamos agora simular duas tarefas bloqueantes gr1 e gr2, considere que há dois requests para serviços externos. Enquanto elas executam, uma terceira tarefa pode ser executada assíncronamente, como no seguinte exemplo:

import time
import asyncio

start = time.time()

def tic():
  return 'at %1.1f segundos' % (time.time() - start)


@asyncio.coroutine
def gr1():
  # Demora a ser executada, mas não queremos esperar
  print('gr1 iniciou a execução: {}'.format(tic()))
  yield from asyncio.sleep(2)
  print('gr1 terminou a execução: {}'.format(tic()))


@asyncio.coroutine
def gr2():
  # Demora a ser executada, mas não queremos esperar
  print('gr2 iniciou a execução: {}'.format(tic()))
  yield from asyncio.sleep(2)
  print('gr2 terminou a execução: {}'.format(tic()))


@asyncio.coroutine
def gr3():
  print('Executando enquanto as outras estão bloqueadas: {}'.format(tic()))
  yield from asyncio.sleep(5)
  print('Pronto!')

ioloop = asyncio.get_event_loop()
tasks = [
    ioloop.create_task(gr1()),
    ioloop.create_task(gr2()),
    ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()

Execute:

$ python3 async2.py 
gr1 iniciou a execução: at 0.0 segundos
gr2 iniciou a execução: at 0.0 segundos
Executando enquanto as outras estão bloqueadas: at 0.0 segundos
gr1 terminou a execução: at 2.0 segundos
gr2 terminou a execução: at 2.0 segundos
Pronto!

Perceba que na forma que o I/O loop faz o gerenciamento e programa a execução permite que o seu código, rodando em single thread possa operar de forma concorrente. Enquanto duas tarefas estavam bloqueadas uma terceira pode tomar o controle do fluxo de execução e ser executada de maneira assíncrona.

Ordem de execução

No mundo síncrono estamos acostumados a pensar de maneira linear. Se nós tivermos uma lista de tarefas que consomem diferente quantidade de tempo elas serão executadas na ordem em que foram chamadas.

Porém, quando usamos concorrência nós precisamos estar cientes de que as tarefas terminam em tempos que diferem da ordem em que foram enfileiradas.

import random
from time import sleep
import asyncio


def task(pid):
    """Uma tarefa não deterministica"""
    sleep(random.randint(0, 2) * 0.001)
    print('Task %s terminada' % pid)


@asyncio.coroutine
def task_coro(pid):
    """Uma tarefa deterministica"""
    yield from asyncio.sleep(random.randint(0, 2) * 0.001)
    print('Task %s terminada' % pid)


def synchronous():
    for i in range(1, 10):
        task(i)


@asyncio.coroutine
def asynchronous():
    tasks = [asyncio.async(task_coro(i)) for i in range(1, 10)]
    yield from asyncio.wait(tasks)


print('Síncronamente:')
synchronous()

ioloop = asyncio.get_event_loop()
print('Assíncronamente:')
ioloop.run_until_complete(asynchronous())

ioloop.close()

Execute:

$ python3 async3.py 
Síncronamente:
Task 1 terminada
Task 2 terminada
Task 3 terminada
Task 4 terminada
Task 5 terminada
Task 6 terminada
Task 7 terminada
Task 8 terminada
Task 9 terminada

Assíncronamente:
Task 2 terminada
Task 4 terminada
Task 8 terminada
Task 5 terminada
Task 6 terminada
Task 7 terminada
Task 9 terminada
Task 1 terminada
Task 3 terminada

A saida será com certeza variada, pois cada task espera por uma quantidade randômica de tempo, mas repare que a ordem dos resultados é completamente diferente, mesmo tendo enfileirado em uma lista de tarefas na mesma ordem usando o mesmo range.

Outro detalhe é que tivemos que uma versão em coroutine da nossa simples função. É importante entender que o asyncio não faz com que as coisas se transformarem magicamente em não bloqueantes.

O AsyncIO está por enquanto sozinho na biblioteca padrão do Python 3 enquanto todos outros módulos oferecem apenas funcionalidades bloqueantes.

Você pode usar o módulo concurrent.futures para agrupar tarefas bloqueantes em uma thread ou um processo e então retornar um Future que o asyncio pode utilizar. Os mesmos exemplos utilizando threads podem ser encontrados no github

Esta é provavelmente a maior desvantagem ao usar asyncio neste momento, porém existe uma série de bibliotecas para diferentes tarefas e serviços que já estão disponíveis de maneira não bloqueante.


Uma tarefa bloqueante bastante comum é coletar dados de um serviço HTTP. Para isso vou usar a excelente biblioteca aiohttp que efetua chamadas não bloqueantes a serviços HTTP. Neste exemplo vamos coletar dados da API pública do Github e ler apenas o valor de Date do responde header.

import time
import urllib.request
import asyncio
import aiohttp

URL = 'https://api.github.com/events'
MAX_CLIENTS = 3


def fetch_sync(pid):
    print('Captura síncrona {} iniciou'.format(pid))
    start = time.time()
    response = urllib.request.urlopen(URL)
    datetime = response.getheader('Date')

    print('Processo {}: {}, demorou: {:.2f} segundos'.format(
        pid, datetime, time.time() - start))

    return datetime


@asyncio.coroutine
def fetch_async(pid):
    print('Captura assíncrona {} iniciou'.format(pid))
    start = time.time()
    response = yield from aiohttp.request('GET', URL)
    datetime = response.headers.get('Date')

    print('Processo {}: {}, demorou: {:.2f} segundos'.format(
        pid, datetime, time.time() - start))

    response.close()
    return datetime


def synchronous():
    start = time.time()
    for i in range(1, MAX_CLIENTS + 1):
        fetch_sync(i)
    print("Processo demorou: {:.2f} segundos".format(time.time() - start))


@asyncio.coroutine
def asynchronous():
    start = time.time()
    tasks = [asyncio.ensure_future(
        fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)]
    yield from asyncio.wait(tasks)
    print("Processo demorou: {:.2f} segundos".format(time.time() - start))


print('Sincrono:')
synchronous()

print('Assíncrono:')
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Execute:

$ python3 -V
Python 3.4.4+

$ pip3 install aiohttp

$ python3 async4.py
Sincrono:
Processo sincrono 1 iniciou
Processo 1: Wed, 17 Feb 2016 13:10:11 GMT, demorou: 0.54 segundos
Processo sincrono 2 iniciou
Processo 2: Wed, 17 Feb 2016 13:10:11 GMT, demorou: 0.50 segundos
Processo sincrono 3 iniciou
Processo 3: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.48 segundos
Process demorou: 1.54 segundos

Assíncrono:
Processo assincrono 1 iniciou
Processo assincrono 2 iniciou
Processo assincrono 3 iniciou
Processo 3: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.50 segundos
Processo 2: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.52 segundos
Processo 1: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.54 segundos
Processo demorou: 0.54 segundos

Nota: requer Python 3.4.4+ caso contrário cairá em exception

Primeiramente, repare na diferença de tempo, usando chamadas assíncronas nós efetuamos as requisições ao serviço HTTP exatamente ao mesmo tempo (13:10:12). Como falado anteriormente, cada requisição passou (yield) o fluxo de controle para a próxima e retornou quando foi completada.

Resultando no fato de que requisitar e capturar os resultados de todos as tarefas demorou o mesmo tempo que a requisição mais lenta! Veja o tempo no log 0.54 segundos para a requisição mais lenta (processo 1) e é exatamente o mesmo tempo que se passou para processar todos os 3 requests, Legal né? (enquanto a parte de I/O daquela tarefa lenta estava bloqueada, as outras puderam ser executadas simultaneamente).

Agora veja como o código é similar ao da versão síncrona! é praticamente o mesmo código! As diferenças principais são por conta das diferenças de implementações das bibliotecas usadas e a parte da criação das tasks e a espera para elas terminarem.

Criando concorrência

Até então estamos usando uma única abordagem de criar e requisitar resultados de coroutines, criar uma lista de tasks e esperar que elas terminem.

Mas as coroutines podem ser programadas para serem executadas e requisitar resultados em maneiras diferentes. Imagine um cenário onde precisamos processar os resultados de uma chamada HTTP GET assim que ela é requisitada, o processo é na verdade similar ao que fizemos no exemplo anterior.

import time
import random
import asyncio
import aiohttp

URL = 'https://api.github.com/events'
MAX_CLIENTS = 3


@asyncio.coroutine
def fetch_async(pid):
    start = time.time()
    sleepy_time = random.randint(2, 5)
    print('Processo assincrono {} iniciou, esperando por {} segundos'.format(
        pid, sleepy_time))

    yield from asyncio.sleep(sleepy_time)

    response = yield from aiohttp.request('GET', URL)
    datetime = response.headers.get('Date')

    response.close()
    return 'Processo {}: {}, demorou: {:.2f} segundos'.format(
        pid, datetime, time.time() - start)


@asyncio.coroutine
def asynchronous():
    start = time.time()
    futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
    for i, future in enumerate(asyncio.as_completed(futures)):
        result = yield from future
        print('{} {}'.format(">>" * (i + 1), result))

    print("Processo demorou: {:.2f} segundos".format(time.time() - start))


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Execute

$ python3 async5.py

Processo assincrono 1 iniciou, esperando por 4 segundos
Processo assincrono 3 iniciou, esperando por 5 segundos
Processo assincrono 2 iniciou, esperando por 3 segundos
>> Processo 2: Wed, 17 Feb 2016 13:55:19 GMT, demorou: 3.53 segundos
>>>> Processo 1: Wed, 17 Feb 2016 13:55:20 GMT, demorou: 4.49 segundos
>>>>>> Processo 3: Wed, 17 Feb 2016 13:55:21 GMT, demorou: 5.48 segundos
Processo demorou: 5.48 segundos

Repare no deslocamento >> e no tempo de cada chamada, elas foram programadas ao mesmo tempo, os resultados chegam fora de ordem e são processados assim que chegam.

Este código é um pouco diferente, estamos agrupando as coroutines em uma lista, cada uma para ser agendada e executada. a função as_completed retorna um iterador que irá gerar (YIELD) um future completo assim que a tarefa estiver terminada. Muito legal né? aliás, as funções as_completed e wait são ambas originalmente parte do modulo concurrent.futures


Vamos pegar um outro exemplo, imagine que você está tentando consultar o seu endereço de IP público atual em seu programa. Existem alguns serviços que fornecem essa informação mas você não tem certeza se estarão acessíveis no momento da execução. Você não quer checar cada um deles sequencialmente, então, é preferível efetuar requisições concorrentes para cada um dos serviços e utilizar aquele que responder mais rapidamente, ok? Ok!

Bom, acontece que nosso velho amigo wait recebe o parâmetro return_when para fazer exatamente isso. Estávamos ignorando o dado retornado pelo wait já que estávamos preocupados apenas em paralelizar as tarefas. Mas agora nós queremos pegar os resultados da coroutine, então usaremos dois conjuntos de futures, done e pending.

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    response = yield from aiohttp.request('GET', service.url)
    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} terminou com resultado: {}, demorou: {:.2f} segundos'.format(
        service.name, ip, time.time() - start)


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = yield from asyncio.wait(
        futures, return_when=FIRST_COMPLETED)
    print(done.pop().result())


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Execute:

$ python3 async6.py
Fetching IP from ip-api
Fetching IP from ipify
ip-api terminou com resultado: 82.34.76.170, demorou: 0.09 segundos
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10f95c6d8>
Task was destroyed but it is pending!
task: <Task pending coro=<fetch_ip() running at 2c-fetch-first-ip-address-response.py:20> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(10)(), Task._wakeup()]>>

Espere, o que aconteceu aqui? O primeiro serviço respondeu com sucesso então o que são esses warnings?

Bem, nós agendamos duas tarefas mas não permitimos que as duas fossem completadas. o AsyncIO considera que é um bug e imprime um warning. Então queremos informar ao event loop que este era um comportamento esperado e não se preocupar com a segunda tarefa que ficou pendente. Como? que bom que perguntou.

Estados do Futuro

(sobre o estado que um future está atualmente, não o estado que ele estará no futuro... você entendeu né!)

Eles são:

  • Pending
  • Running
  • Done
  • Cancelled

Simples assim, quando um future finaliza sua tarefa, esta tarefa irá retornar o resultado de volta para o future, se estiver em pending ou cancelled ele gera o erro InvalidStateError ou CancelledError, e finalmente se a coroutine gera o erro ele será re-gerado, o que significa o mesmo comportamento ao chamar exception. confira aqui

Você também pode usar .done, .cancelled e .running em uma Future para obter um booleano indicando o estado. Note que done significa simplesmente que o result irá retornar ou gerar um erro. Você pode explicitamente cancelar um Future chamando o método cancel, e isso é o que precisamos para resolver o warning anterior.



def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = yield from asyncio.wait(
        futures, return_when=FIRST_COMPLETED)
    print(done.pop().result())

    for future in pending:
        future.cancel()

e então

$ python3 async6.py
Fetching IP from ip-api
Fetching IP from ipify
ip-api terminou com resultado: 82.34.76.170, demorou: 0.09 segundos

Um bom resultado né!

Futures permitem injetar callbacks a serem executados quando entrarem no estado done caso você queira adicionar uma lógica adicional, ou se você não quiser usar o yield from e prefira o callback hell, (sério quer mesmo?)

E você pode, para objetivos de unit-testing manualmente injetar o resultado ou uma exception a um Future.

Gerenciamento de erros

O AsyncIO é sobre fazer código concorrente gerenciável e legível, e isto se torna óbio no que diz respeito ao tratamento de erros. Vamos voltar ao exemplo anterior para ilustrar isso.

Imagine que queremos garantir que os 2 serviços retornaram o mesmo resultado, mas um dos serviços fica offline e não responde, podemos usar apenas o usual try... except

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
    Service('broken', 'http://este-servico-nao-funciona', 'ip')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        return "{} não está respondendo".format(service.name)

    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} terminou com resultado: {}, demorou: {:.2f} segundos'.format(
        service.name, ip, time.time() - start)


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, _ = yield from asyncio.wait(futures)

    for future in done:
        print(future.result())


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

execute:

$ python3 async7.py
Fetching IP from ip-api
Fetching IP from borken
Fetching IP from ipify
ip-api terminou com o resultado: 85.133.69.250, demorou: 0.75 segundos
ipify terminou com o resultado: 85.133.69.250, demorou: 1.37 segundos
borken não está respondendo

Também podemos tratar os erros enquanto processamos os resultados dos Futures em caso de algo não esperado acontecer (lembra que eu disse que os erros são re-gerados na coroutine).

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp
import traceback

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
    Service('broken', 'http://este-servico-nao-funciona', 'ip')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        return "{} não está respondendo".format(service.name)

    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} terminou com resultado: {}, demorou: {:.2f} segundos'.format(
        service.name, ip, time.time() - start)


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, _ = yield from asyncio.wait(futures)

    for future in done:
        try:
            print(future.result())
        except:
            print("Erro não esperado: {}".format(traceback.format_exc()))


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Veja:

$ python3 async7.py
Fetching IP from ipify
Fetching IP from borken
Fetching IP from ip-api
ipify terminou com o resultado: 85.133.69.250, demorou: 0.91 segundos
borken não está respondendo
Erro não esperado: Traceback (most recent call last):
 File “3b-fetch-ip-addresses-future-exceptions.py”, line 41, in asynchronous
 print(future.result())
 File “/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py”, line 274, in result
 raise self._exception
 File “/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py”, line 239, in _step
 result = coro.send(value)
 File “3b-fetch-ip-addresses-future-exceptions.py”, line 27, in fetch_ip
 ip = json_response[service.ip_attr]
KeyError: ‘this-is-not-an-attr’

Da mesma forma que agendar uma task e não esperar que ela termine é considerado um bug, agendar uma task e não recuperar possíveis erros também irá gerar um warning.

from collections import namedtuple
import time
import asyncio
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'),
    Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        print('{} is unresponsive'.format(service.name))
    else:
        json_response = yield from response.json()
        ip = json_response[service.ip_attr]

        response.close()
        print('{} finished with result: {}, took: {:.2f} seconds'.format(
            service.name, ip, time.time() - start))


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    yield from asyncio.wait(futures)  # intentionally ignore results


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

execute:

$ python3 async8.py
Fetching IP from ipify
Fetching IP from borken
Fetching IP from ip-api
borken is unresponsive
ipify finished with result: 85.133.69.250, took: 0.78 seconds
Task exception was never retrieved
future: <Task finished coro=<fetch_ip() done, defined at 3c-fetch-ip-addresses-ignore-exceptions.py:15> exception=KeyError(‘this-is-not-an-attr’,)>
Traceback (most recent call last):
 File “/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py”, line 239, in _step
 result = coro.send(value)
 File “3c-fetch-ip-addresses-ignore-exceptions.py”, line 26, in fetch_ip
 ip = json_response[service.ip_attr]
KeyError: ‘this-is-not-an-attr’

Se parece muito com a saída do exemplo anterior, mas não contém os mínimos detalhes e mensagens do asyncio.

Timeouts

E se nós não nos importarmos muito com o nosso IP? Imagine que é apenas um adicional ao nosso serviço, mas não importante, não queremos que o usuário fique esperando por este dado. Idealmente nós definimos um time-out para nossas tarefas não bloqueantes, e então continuamos nosso programa sem o atributo do IP já que neste exemplo não é tão importante.

De novo descobrimos que wait tem o atributo que precisamos:

import time
import random
import asyncio
import aiohttp
import argparse
from collections import namedtuple
from concurrent.futures import FIRST_COMPLETED

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
)

DEFAULT_TIMEOUT = 0.01


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    yield from asyncio.sleep(random.randint(1, 3) * 0.1)
    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        return '{} is unresponsive'.format(service.name)

    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    print('{} finished with result: {}, took: {:.2f} seconds'.format(
        service.name, ip, time.time() - start))
    return ip


@asyncio.coroutine
def asynchronous(timeout):
    response = {
        "message": "Result from asynchronous.",
        "ip": "not available"
    }

    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = yield from asyncio.wait(
        futures, timeout=timeout, return_when=FIRST_COMPLETED)

    for future in pending:
        future.cancel()

    for future in done:
        response["ip"] = future.result()

    print(response)


parser = argparse.ArgumentParser()
parser.add_argument(
    '-t', '--timeout',
    help='Timeout to use, defaults to {}'.format(DEFAULT_TIMEOUT),
    default=DEFAULT_TIMEOUT, type=float)
args = parser.parse_args()

print("Using a {} timeout".format(args.timeout))
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous(args.timeout))
ioloop.close()

Repare no argumento timeout, também adicionamos um parâmetro de linha de comando para testar mais facilmente o que acontecerá se deixarmos a requisição ocorrer algumas vezes. Também adicionei um tempo randomico de slepp só para garantir que as coisas não aconteçam tão rápido que a gente nem perceba.

$ python async8.py

Using a 0.01 timeout
Fetching IP from ipify
Fetching IP from ip-api
{‘message’: ‘Result from asynchronous.’, ‘ip’: ‘not available’}

$ python async8.py -t 5
Using a 5.0 timeout
Fetching IP from ip-api
Fetching IP from ipify
ipify finished with result: 82.34.76.170, took: 1.24 seconds
{'ip': '82.34.76.170', 'message': 'Result from asynchronous.'}

Conclusão

Asyncio aumentou meu já enorme amor por Python. Para ser absolutamente honesto eu me apaixonei pelas coroutines em Python quando conheci o Tornado mas o asyncio conseguiu unir o melhor desta abordagem junto com excelentes bibliotecas de concorrência. E muito foi feito para que outras bibliotecas possam usar o IO loop, então se você está usando o Tornado, você também pode usa-lo com bibliotecas feitas para o asyncio!

E como eu disse anteriormente o maior problema por enquanto é a falta de bibliotecas e módulos que implementam o comportamento não bloqueante. Você pode achar que uma vasta quantidade de tecnologias já estabelecidas ainda não tenham uma versão não bloqueante para interagir com asyncio, ou que as existentes ainda estão jovens ou experimentais. Porém, O numero de bibliotexas está crescendo diariamente

confira este repo: https://github.com/aio-libs

Espero neste tutorial ter passado a idéia de como é prazeroso trabalhar com AsyncIO. Eu honestamente penso que isto é a peça que finalmente nos levará a adoção em massa e adaptação ao Python 3, essas são as coisas que você está perdendo se ficar parado no Python 2.7.

Uma coisa é certa, o Futuro do Python mudou completamente! (trocadilho intencional)

  • The quality of the python ecosystem in Slides · 21:23 of 06/27/2017
  • Consumindo e Publicando web APIs - PyData São Paulo - 2017 in Slides · 00:58 of 03/29/2017
  • Migrando e-commerce do Iluria para o Shopify (usando Python) in python · 20:56 of 01/15/2017

  • comments powered by Disqus Go Top