This entry is an iteration on Python Multiprocessing During last week I found an opportunity in my code: process the output of a long transcoding in a background worker. The catch here is that, said transcode, generates chunks of work. I can listen to its evolution and process every chunk individually, while the main transcode is still in progress.

The code can be found here: Queue processing and you’ll find something that resembles a test suite here Driving the queue processing

So these are my requirements:

  • I want the whole strcuture to be encapsulated behind a class. Facading might be a good way of describing the approach. After the object creation, a new process is spawned and it peacefully waits for tasks to do.
  • Sending a task to the background worker is non blocking to the caller. No matter if the worker has crashed.
  • When the main process has finished I collect the results from the background. Such results must match the task objects.

So, in a way, I’m mapping a list of tasks that is being populated little by little. I’m sure there is a better way out there to implement this pattern.

Code walking

The basic usage looks like this:

w = queue_consumer.WorkingProcessFacade()
w.process_task(queue_consumer.SimpleTask("one", 1.0))
w.process_task(queue_consumer.SimpleTask("two", 2.5))
tasks_result = w.collect_tasks()

for t in tasks_result:
    print str(t.ID) + " - "+ str(t.result)

The WorkingProcessFacade will spawn the background process on creation. Obviously what the task is supposed to do is an implementation detail, here I’m using the float as an estimation of the background processing. Calling collect_tasks signals the end of the job: terminates the worker, closes the queues, and changes the state of the working process facade, no more tasks will be accepted.

def process_queue(console_mutex, input_queue, output_queue):
    print_mutexed(console_mutex, "Started, waiting for tasks")

    task = input_queue.get()
    while task is not None:
        if task.ID.lower() == "crash": 
            raise Exception("freak out")
        print_mutexed(console_mutex, "Received: " + str(task.ID))
        time.sleep(task.estimated_processing_time) # simulates the task
        task.result = str(task.ID) + " solved after " + str(task.estimated_processing_time)
        output_queue.put(task)
        task = input_queue.get()

    output_queue.close()
    print_mutexed(console_mutex, "Termination received, ending worker process")

Since this code is mainly for trainig, I’ve taken some liberties. For instance, naming a SimpleTask “crash” will kill the background worker. Also, I’m sending an explicit terminator through the queue, so the process gets notified of the end of the work. An alternative would be to use close on the communication queue, I might take a look into that during the upcoming days.

When things go wrong

From time to time the background workers, just die. Exceptions are thrown, resources are unavailable, or a million other things could go terribly wrong. I’ve tested this solution against:

  • multiple calls to collect_tasks
  • background worker untimely dead, the Facade shouldn’t deadlock

What is missing

Relying on the user code to call collect_tasks might be a little bit risky. Maybe using a with pattern, or writting a custom destructor might be useful. So, if you use this implementation, keep an eye open for the usage pattern.

And also: Bonus points for mutexing the console.