Python trick: asynchronously reading subprocess pipes

9 March, 2012 - 00:19
Categories:

As its name suggests, the Python subprocess allows you to spawn a child/sub process and keep an eye on its standard output through a pipe for example. Very handy to glue together external systems/processes. A tricky issue with having parallel "flows" this way (your python process and the child process in this case), is the risk on deadlocks if you are not careful with the order things are done. For example, you want to read from the subprocess standard output pipe, but the buffer of the standard error pipe is full and the operating system wants you to read that first. Kaboom, deadlock. That's why the documentation recommends the communicate() method, which returns the complete standard output and standard error content in one fell swoop.

Fine, but what if you want to read standard output and error line by line, for example because you want to monitor a longer running process? On the web you can find many solutions, with varying degrees of complexity, abstraction and dependencies. One solution (with limited code and no dependencies outside the standard library) is to read the pipes in separate threads, so one pipe can't block another.

The code below shows an example implementation. The script is set up in such a way that is used both for the parent as the child process.

  • For the child process: when called with 'produce' argument, it runs the produce() function that just renders some lines randomly on standard output and standard error. Between the lines there is a touch of delay simulate a longer running process.
  • The parent process (script called without arguments), implemented in the consume() function, invokes the same script in "child mode" as subprocess and monitors its output line by line, without knowing in advance from which pipe each line will come.

The AsynchronousFileReader class is for the threads that will read the standard output and error pipes asynchronously and put each line on a queue. The main thread can then monitor the subprocess by watching the lines as they come in on the queues.

import sys
import subprocess
import random
import time
import threading
import Queue
 
class AsynchronousFileReader(threading.Thread):
    '''
    Helper class to implement asynchronous reading of a file
    in a separate thread. Pushes read lines on a queue to
    be consumed in another thread.
    '''
 
    def __init__(self, fd, queue):
        assert isinstance(queue, Queue.Queue)
        assert callable(fd.readline)
        threading.Thread.__init__(self)
        self._fd = fd
        self._queue = queue
 
    def run(self):
        '''The body of the tread: read lines and put them on the queue.'''
        for line in iter(self._fd.readline, ''):
            self._queue.put(line)
 
    def eof(self):
        '''Check whether there is no more content to expect.'''
        return not self.is_alive() and self._queue.empty()
 
def consume(command):
    '''
    Example of how to consume standard output and standard error of
    a subprocess asynchronously without risk on deadlocking.
    '''
 
    # Launch the command as subprocess.
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
    # Launch the asynchronous readers of the process' stdout and stderr.
    stdout_queue = Queue.Queue()
    stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue)
    stdout_reader.start()
    stderr_queue = Queue.Queue()
    stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue)
    stderr_reader.start()
 
    # Check the queues if we received some output (until there is nothing more to get).
    while not stdout_reader.eof() or not stderr_reader.eof():
        # Show what we received from standard output.
        while not stdout_queue.empty():
            line = stdout_queue.get()
            print 'Received line on standard output: ' + repr(line)
 
        # Show what we received from standard error.
        while not stderr_queue.empty():
            line = stderr_queue.get()
            print 'Received line on standard error: ' + repr(line)
 
        # Sleep a bit before asking the readers again.
        time.sleep(.1)
 
    # Let's be tidy and join the threads we've started.
    stdout_reader.join()
    stderr_reader.join()
 
    # Close subprocess' file descriptors.
    process.stdout.close()
    process.stderr.close()
 
def produce(items=10):
    '''
    Dummy function to randomly render a couple of lines
    on standard output and standard error.
    '''
    for i in range(items):
        output = random.choice([sys.stdout, sys.stderr])
        output.write('Line %d on %s\n' % (i, output))
        output.flush()
        time.sleep(random.uniform(.1, 1))
 
if __name__ == '__main__':
    # The main flow:
    # if there is an command line argument 'produce', act as a producer
    # otherwise be a consumer (which launches a producer as subprocess).
    if len(sys.argv) == 2 and sys.argv[1] == 'produce':
        produce(10)
    else:
        consume(['python', sys.argv[0], 'produce'])

Running this will generate a line by line monitoring report like

Received line on standard output: "Line 0 on <open file '<stdout>', mode 'w' at 0x1002921e0>\n"
Received line on standard error: "Line 1 on <open file '<stderr>', mode 'w' at 0x100292270>\n"
Received line on standard output: "Line 2 on <open file '<stdout>', mode 'w' at 0x1002921e0>\n"
Received line on standard error: "Line 3 on <open file '<stderr>', mode 'w' at 0x100292270>\n"
Received line on standard output: "Line 4 on <open file '<stdout>', mode 'w' at 0x1002921e0>\n"
Received line on standard output: "Line 5 on <open file '<stdout>', mode 'w' at 0x1002921e0>\n"
Received line on standard error: "Line 6 on <open file '<stderr>', mode 'w' at 0x100292270>\n"
Received line on standard error: "Line 7 on <open file '<stderr>', mode 'w' at 0x100292270>\n"
Received line on standard output: "Line 8 on <open file '<stdout>', mode 'w' at 0x1002921e0>\n"
Received line on standard output: "Line 9 on <open file '<stdout>', mode 'w' at 0x1002921e0>\n"