Fork me on GitHub

Bruno Rocha at 16:17 of 12/07/2013

requirement

Watch changes in a ftp folder, whenever a new xml file is created, or when an existing file is modified this needs to be parsed and its contents inserted in the database.

tools

  • Python 2..7
  • watchdog

Install from pip

pip install watchdog

Watchdog is a Python API library and shell utilities to monitor file system events.

How to

First create the monitoring script, it will run daemonized and will observe any changes to the given directory. In that script 3 modules/classes will be used

  • time from Python will be used to sleep the main loop
  • watchdog.observers.Observer is the class that will watch for any change, and then dispatch the event to specified the handler.
  • watchdog.events.PatterMatchingHandler is the class that will take the event dispatched by the observer and perform some action

watchforchanges.py

import time  
from watchdog.observers import Observer  
from watchdog.events import PatternMatchingEventHandler  

PatternMatchingEventHandler inherits from FileSystemEventHandler and exposes some usefull methods:

Events are: modified, created, deleted, moved

  • onanyevent: if defined, will be executed for any event
  • on_created: Executed when a file or a directory is created
  • on_modified: Executed when a file is modified or a directory renamed
  • on_moved: Executed when a file or directory is moved
  • on_deleted: Executed when a file or directory is deleted.

Each one of those methods receives the event object as first parameter, and the event object has 3 attributes.

  • event_type
    'modified' | 'created' | 'moved' | 'deleted'
  • is_directory
    True | False
  • src_path
    path/to/observed/file

So to create a handler just inherit from one of the existing handlers, for this example PatternMatchingEventHandler will be used to match only xml files.

To simplify I will enclose the file processor in just one method, and I will implement method only for onmodified and oncreated, which means that my handler will ignore any other events.

Also defining the patterns attribute to watch only for files with xml or lxml extensions.

 class MyHandler(PatternMatchingEventHandler):
    patterns = ["*.xml", "*.lxml"]

    def process(self, event):
        """
        event.event_type 
            'modified' | 'created' | 'moved' | 'deleted'
        event.is_directory
            True | False
        event.src_path
            path/to/observed/file
        """
        # the file will be processed there
        print event.src_path, event.event_type  # print now only for degug

    def on_modified(self, event):
        self.process(event)

    def on_created(self, event):
        self.process(event)

With the above handler only creation and modification will be watched now the Obserser needs to be scheduled.

if __name__ == '__main__':
    args = sys.argv[1:]
    observer = Observer()
    observer.schedule(MyHandler(), path=args[0] if args else '.')
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()

    observer.join()

You can set the named-argument "recursive" to True for observer.schedule. if you want to watch for files in subfolders.

That's all needed to watch for modifications on the given directory, it will take the current directory as default or the path given as first parameter.

python watch_for_changes.py /path/to/directory

let it run in a shell and open another one or the file browser to change or create new .xml files in the /path/to/directory.

echo "testing" > /tmp/test.xml 

Since the handler is printing the results, the outrput should be:

rochacbruno@~/$ python watch_for_changes.py /tmp
/tmp/test.xml created
/tmp/test.xml modified

Now to complete the script only need to implement in the process method, the necessary logic to parse and insert to database.

For example, if the xml file contains some data about current track on a web radio:

<?xml version="1.0" encoding="ISO-8859-1" standalone="yes" ?> 
 <Pulsar>
  <OnAir>
     <media_type>default</media_type> 
     <media>
        <title1>JOVEM PAN FM</title1> 
        <title2>100,9MHz</title2> 
        <title3>A maior rede de radio do Brasil</title3> 
        <title4>00:00:00</title4> 
        <media_id1>#ID_Title#</media_id1> 
        <media_id2>#ID_SubTitle#</media_id2> 
        <media_id3>#ID_Album#</media_id3> 
        <hour>2013-12-07 11:44:32</hour> 
        <length>#Duration#</length> 
        <ISRC>#Code#</ISRC> 
    <id_singer>#ID_Singer#</id_singer>
    <id_song>#ID_Song#</id_song>
    <id_album>#ID_Album#</id_album>
    <id_jpg>#Jpg#</id_jpg>
     </media>
  </OnAir>
</Pulsar>

The easiest way to parse this small xml is using xmltodict library.

pip install xmltodict

With xmltodict.parse function the above xml will be outputed as an OrderedDict

OrderedDict([(u'Pulsar',
    OrderedDict([(u'OnAir',
        OrderedDict([(u'media_type', u'default'),
        (u'media', 
            OrderedDict([(u'title1', u'JOVEM PAN FM'),
                         (u'title2', u'100,9MHz'),
                         (u'title3', u'A maior rede de radio do Brasil'),
                         (u'title4', u'00:00:00'),
                         (u'media_id1', u'#ID_Title#'),
                         (u'media_id2', u'#ID_SubTitle#'),
                         (u'media_id3', u'#ID_Album#'),
                         (u'hour', u'2013-12-07 11:44:32'),
                         (u'length', u'#Duration#'),
                         (u'ISRC', u'#Code#'),
                         (u'id_singer', u'#ID_Singer#'),
                         (u'id_song', u'#ID_Song#'),
                         (u'id_album', u'#ID_Album#'),
                         (u'id_jpg', u'#Jpg#')]))]))]))])

Now we can just access that dict to create the registry on filesystem or something else. Notice that I will use a lot of get method of dict type to avoid KeyErrors.

with open(event.src_path, 'r') as xml_source:
    xml_string = xml_source.read()
    parsed = xmltodict.parse(xml_string)
    element = parsed.get('Pulsar', {}).get('OnAir', {}).get('media')
    if not element:
        return
    print dict(element)

ant the output will be:

{u'hour': u'2013-12-07 11:44:32',
 u'title2': u'100,9MHz',
 u'id_album': u'#ID_Album#',
 u'title1': u'JOVEM PAN FM',
 u'length': u'#Duration#',
 u'title3': u'A maior rede de radio do Brasil',
 u'title4': u'00:00:00',
 u'ISRC': u'#Code#',
 u'id_song': u'#ID_Song#',
 u'media_id2': u'#ID_SubTitle#',
 u'media_id1': u'#ID_Title#',
 u'id_jpg': u'#Jpg#',
 u'media_id3': u'#ID_Album#',
 u'id_singer': u'#ID_Singer#'}

Much better than XPATH, and for this particular case when the xml_source is small there will no relevant performace issue.

Now only need to get the values and populate the database, in my case I will use Redis DataModel as storage.

also I will use magicdate module to automagically convert the date format to datetime object.

import sys
import time
import xmltodict
import magicdate
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler

from .models import Media


class MyHandler(PatternMatchingEventHandler):
    patterns=["*.xml"]

    def process(self, event):
        """
        event.event_type
            'modified' | 'created' | 'moved' | 'deleted'
        event.is_directory
            True | False
        event.src_path
            path/to/observed/file
        """

        with open(event.src_path, 'r') as xml_source:
            xml_string = xml_source.read()
            parsed = xmltodict.parse(xml_string)
            element = parsed.get('Pulsar', {}).get('OnAir', {}).get('media')
            if not element:
                return

            media = Media(
                title=element.get('title1'),
                description=element.get('title3'),
                media_id=element.get('media_id1'),
                hour=magicdate(element.get('hour')),
                length=element.get('title4')
            )
            media.save()

    def on_modified(self, event):
        self.process(event)

    def on_created(self, event):
        self.process(event)


if __name__ == '__main__':
    args = sys.argv[1:]
    observer = Observer()
    observer.schedule(MyHandler(), path=args[0] if args else '.')
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()

    observer.join()

That is my usecase, but the example can be used for any kind of requirement.

Another useful module is Workflow by Massimo Di Pierro that creates workflows based on rules defined in a config file.

Oportunidade para Programador Python / Data Scientist na Catho

Você gosta de trabalhar com inteligência aplicada? Não tem medo de aprender novas tecnologias? Venha trabalhar conosco!

Published at 17:29 of 10/27/2014

Read more »

What The Flask - Série de 6 artigos + tutorial para aprender desenvolvimento web com Python e Flask

Nesta série de 6 artigos/tutoriais pretendo abordar de maneira bem detalhada o desenvolvimento web com o framework Flask.

Published at 14:30 of 06/02/2014

Read more »

Usando o Flask Cache

Como usar a extensão Flask-Cache para cachear views, funções e blocos de templates.

Published at 00:37 of 04/19/2014

Read more »


comments powered by Disqus