Discussions criticizing Python often talk about how it is difficult to use Python for multithreaded work, pointing fingers at what is known as the global interpreter lock (affectionately referred to as the “GIL”) that prevents multiple threads of Python code from running simultaneously. Due to this, the threading module doesn’t quite behave the way you would expect it to if you’re not a Python developer and you are coming from other languages such as C++ or Java. It must be made clear that one can still write code in Python that runs concurrently or in parallel and make a stark difference resulting performance, as long as certain things are taken into consideration. If you haven’t read it yet, I suggest you take a look at Eqbal Quran’s article on concurrency and parallelism in Ruby here on the Toptal blog.

In this Python concurrency tutorial, we will write a small Python script to download the top popular images from Imgur. We will start with a version that downloads images sequentially, or one at a time. As a prerequisite, you will have to register an application on Imgur. If you do not have an Imgur account already, please create one first.

The scripts in this tutorial has been tested with Python 3.4.2. With some changes, they should also run with Python 2 - urllib is what has changed the most between these two versions of Python.

Getting Started with Multithreading in Python

Let us start by creating a Python module, named “download.py”. This file will contain all the functions necessary to fetch the list of images and download them. We will split these functionalities into three separate functions:

  • get_links
  • download_link
  • setup_download_dir

The third function, “setup_download_dir”, will be used to create a download destination directory if it doesn’t already exist.

Imgur’s API requires HTTP requests to bear the “Authorization” header with the client ID. You can find this client ID from the dashboard of the application that you have registered on Imgur, and the response will be JSON encoded. We can use Python’s standard JSON library to decode it. Downloading the image is an even simpler task, as all you have to do is fetch the image by its URL and write it to a file.

Python multithreading

This is what the script looks like:

import json
import logging
import os
from pathlib import Path
from urllib.request import urlopen, Request

logger = logging.getLogger(__name__)

def get_links(client_id):
   headers = {'Authorization': 'Client-ID {}'.format(client_id)}
   req = Request('https://api.imgur.com/3/gallery/', headers=headers, method='GET')
   with urlopen(req) as resp:
       data = json.loads(resp.readall().decode('utf-8'))
   return map(lambda item: item['link'], data['data'])

def download_link(directory, link):
   logger.info('Downloading %s', link)
   download_path = directory / os.path.basename(link)
   with urlopen(link) as image, download_path.open('wb') as f:
       f.write(image.readall())

def setup_download_dir():
   download_dir = Path('images')
   if not download_dir.exists():
       download_dir.mkdir()
   return download_dir

Next, we will need to write a module that will use these functions to download the images, one by one. We will name this “single.py”. This will contain the main function of our first, naive version of the Imgur image downloader. The module will retrieve the Imgur client ID in the environment variable “IMGUR_CLIENT_ID”. It will invoke the “setup_download_dir” to create the download destination directory. Finally, it will fetch a list of images using the get_links function, filter out all GIF and album URLs, and then use “download_link” to download and save each of those images to the disk. Here is what “single.py” looks like:

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)

def main():
   ts = time()
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   for link in links:
       download_link(download_dir, link)
   print('Took {}s'.format(time() - ts))

if __name__ == '__main__':
   main()

On my laptop, this script took 19.4 seconds to download 91 images. Please do note that these numbers may vary based on the network you are on. 19.4 seconds isn’t terribly long, but what if we wanted to download more pictures? Perhaps 900 images, instead of 90. With an average of 0.2 seconds per picture, 900 images would take approximately 3 minutes. For 9000 pictures it would take 30 minutes. The good news is that by introducing concurrency or parallelism, we can speed this up dramatically.

All subsequent code examples will only show import statements that are new and specific to those. For convenience, all of these Python scripts can be found in this GitHub repository.

Using Threads for Concurrency and Parallelism

Threading is one of the most well known approaches to attaining Python concurrency and parallelism. Threading is a feature usually provided by the operating system. Threads are lighter than processes, and share the same memory space.

Threading - Python concurrency and parallelism

In our Python thread tutorial, we will write a new module to replace “single.py”. This module will create a pool of 8 threads, making a total of 9 threads including the main thread. I chose 8 worker threads, because my computer has 8 CPU cores and one worker thread per core seemed a good number for how many threads to run at once. In practice, this number is chosen much more carefully based on other factors, such as other applications and services running on the same machine.

This is almost the same as the previous one, with the exception that we now have a new class, DownloadWorker, that is a descendent of the Thread class. The run method has been overridden, which runs an infinite loop. On every iteration, it calls “self.queue.get()” to try and fetch an URL to from a thread-safe queue. It blocks until there is an item in the queue for the worker to process. Once the worker receives an item from the queue, it then calls the same “download_link” method that was used in the previous script to download the image to the images directory. After the download is finished, the worker signals the queue that that task is done. This is very important, because the Queue keeps track of how many tasks were enqueued. The call to “queue.join()” would block the main thread forever if the workers did not signal that they completed a task.

from queue import Queue
from threading import Thread

class DownloadWorker(Thread):
   def __init__(self, queue):
       Thread.__init__(self)
       self.queue = queue

   def run(self):
       while True:
           # Get the work from the queue and expand the tuple
           directory, link = self.queue.get()
           download_link(directory, link)
           self.queue.task_done()

def main():
   ts = time()
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   # Create a queue to communicate with the worker threads
   queue = Queue()
   # Create 8 worker threads
   for x in range(8):
       worker = DownloadWorker(queue)
       # Setting daemon to True will let the main thread exit even though the workers are blocking
       worker.daemon = True
       worker.start()
   # Put the tasks into the queue as a tuple
   for link in links:
       logger.info('Queueing {}'.format(link))
       queue.put((download_dir, link))
   # Causes the main thread to wait for the queue to finish processing all the tasks
   queue.join()
   print('Took {}'.format(time() - ts))

Running this script on the same machine used earlier results in a download time of 4.1 seconds! Thats 4.7 times faster than the previous example. While this is much faster, it is worth mentioning that only one thread was executing at a time throughout this process due to the GIL. Therefore, this code is concurrent but not parallel. The reason it is still faster is because this is an IO bound task. The processor is hardly breaking a sweat while downloading these images, and the majority of the time is spent waiting for the network. This is why threading can provide a large speed increase. The processor can switch between the threads whenever one of them is ready to do some work. Using the threading module in Python or any other interpreted language with a GIL can actually result in reduced performance. If your code is performing a CPU bound task, such as decompressing gzip files, using the threading module will result in a slower execution time. For CPU bound tasks and truly parallel execution, we can use the multiprocessing module.

While the de facto reference Python implementation - CPython - has a GIL, this is not true of all Python implementations. For example, IronPython, a Python implementation using the .NET framework does not have a GIL, and neither does Jython, the Java based implementation. You can find a list of working Python implementations here.

Spawning Multiple Processes

The multiprocessing module is easier to drop in than the threading module, as we don’t need to add a class like the threading example. The only changes we need to make are in the main function.

multiprocessing module

To use multiple processes we create a multiprocessing Pool. With the map method it provides, we will pass the list of URLs to the pool, which in turn will spawn 8 new processes and use each one to download the images in parallel. This is true parallelism, but it comes with a cost. The entire memory of the script is copied into each subprocess that is spawned. In this simple example it isn’t a big deal, but it can easily become serious overhead for non-trivial programs.

from functools import partial
from multiprocessing.pool import Pool

def main():
   ts = time()
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   download = partial(download_link, download_dir)
   with Pool(8) as p:
       p.map(download, links)
   print('Took {}s'.format(time() - ts))

Distributing to Multiple Workers

While the threading and multiprocessing modules are great for scripts that are running on your personal computer, what should you do if you want the work to be done on a different machine, or you need to scale up to more than the CPU on one machine can handle? A great use case for this is long-running back-end tasks for web applications. If you have some long running tasks, you don’t want to spin up a bunch of subprocesses or threads on the same machine that need to be running the rest of your application code. This will degrade the performance of your application for all of your users. What would be great is to be able to run these jobs on another machine, or many other machines.

A great Python library for this task is RQ, a very simple yet powerful library. You first enqueue a function and its arguments using the library. This pickles the function call representation, which is then appended to a Redis list. Enqueueing the job is the first step, but will not do anything yet. We also need at least one worker to listen on that job queue.

RQ python library

The first step is to install and run a Redis server on your computer, or have access to a running Redis server. After that, there are only a few small changes made to the existing code. We first create an instance of an RQ Queue and pass it an instance of a Redis server from the redis-py library. Then, instead of just calling our “download_link” method, we call “q.enqueue(download_link, download_dir, link)”. The enqueue method takes a function as its first argument, then any other arguments or keyword arguments are passed along to that function when the job is actually executed.

One last step we need to do is to start up some workers. RQ provides a handy script to run workers on the default queue. Just run “rqworker” in a terminal window and it will start a worker listening on the default queue. Please make sure your current working directory is the same as where the scripts reside in. If you want to listen to a different queue, you can run “rqworker queue_name” and it will listen to that named queue. The great thing about RQ is that as long as you can connect to Redis, you can run as many workers as you like on as many different machines as you like; therefore, it is very easy to scale up as your application grows. Here is the source for the RQ version:

from redis import Redis
from rq import Queue

def main():
   client_id = os.getenv('IMGUR_CLIENT_ID')
   if not client_id:
       raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
   download_dir = setup_download_dir()
   links = [l for l in get_links(client_id) if l.endswith('.jpg')]
   q = Queue(connection=Redis(host='localhost', port=6379))
   for link in links:
       q.enqueue(download_link, download_dir, link)

However, RQ is not the only Python job queue solution. RQ is easy to use and covers simple use cases extremely well, but if more advanced options are required, other job queue solutions (such as Celery) can be used.

Conclusion

If your code is IO bound, both multiprocessing and multithreading in Python will work for you. Multiprocessing is a easier to just drop in than threading, but has a higher memory overhead. If your code is CPU bound, multiprocessing is most likely going to be the better choice - especially if the target machine has multiple cores or CPUs. For web applications, and when you need to scale the work across multiple machines, RQ is going to be better for you.

About the author

Marcus McCurdy, United States
member since November 10, 2014
Marcus has a Bachelor's in Computer Engineering and a Master's in Computer Science. He is a talented programmer, and excels most at back-end development. However, he is comfortable creating polished products as a full stack developer. [click to continue...]
Hiring? Meet the Top 10 Freelance Python Developers for Hire in December 2016

Comments

jorjun
Would like to see an example using the new asyncio module. Also, Gevent works wonders for this kind of I/O bound concurrency problem if you have to stick with python 2. There really is no need to use either threads or multiprocessing - green threads could be the way to go...
Marcus McCurdy
The asyncio module would be a pretty hefty example as the underlying urllib code isn't setup to use async connections. You can see an example here https://docs.python.org/3/library/asyncio-stream.html#get-http-headers of fetching headers. I was going to touch on Gevent, but it doesn't work with Python 3 at this time.
Mariano Simone
If your code is I/O bound, you should increase the number of data streams a process manage by means of async I/O. Creating a thread/forking a process just for handling new connections is a horrid waste of resources. Now, if you are doing CPU intensive operations, it clearly makes sense throwing more cores at the problem.
Mariano Simone
Btw, the amount of work and the quality of the post is very impressive!. Good job dude.
jorjun
I see. The aiohttp module looks promising. http://geekgirl.io/concurrent-http-requests-with-python3-and-asyncio/
Marcus McCurdy
You are correct that ones approach to concurrency/parallelism depends on if the underlying code is IO vs. CPU bound and I touch on this a bit in the article. I wanted to keep the article as simple as possible and still demonstrate the different options available in Python 3. I felt that including both IO bound and CPU bound examples would bloat the article, but I do mention an example of a CPU bound task in the article. I really wanted to write the article using Python 3 as I feel there aren't as many resources for it. I also wanted to include async IO using gevent, but gevent doesn't support Python 3. I decided to go with a pure Python 3 article rather than include some examples that work in Python 3 and some that work in Python 2.
Marcus McCurdy
Thanks for the compliment and your other comment.
Eki Eqbal
Nice article, keep it up. Cheers
rockyqi
good article with interesting images, thanks
Zero_NzYme
Very Nice Article. Going to give the redis and celery options a try. Very cool!
Jon
Hi, could you provide some insight into how someone could use multiprocessing to perform different functions, e.g. first function is reading zipped csv files into memory and the second function merges those csv files in the same order. This way as function 2 is still merging files 1 and 2, file 3 is being read into memory. Similar to how a sandwich shop could have multiple workers working on the same sandwiches as they pass through the different stages of production.
Ashwin Nanjappa
Good article. However it is not mentioned what to do if your code is CPU bound, data is big (multiprocessing is out of the question) and is not a web application. This is typical of scientific applications. The article should at least mention in the conclusion that this currently cannot be solved in CPython efficiently.
Anon Omus
Displaying a technique without providing necessary information for correct usage of it can hardly be considered bloat. It's just being lazy. :/
Andrew Franklin
As someone that doesn't know much about concurrency and parallelism in python this seems like a good place to start! Thanks for this tutorial! Unfortunately, I also don't know much about the urllib and downloading images from imgur. Occasionally, these codes won't run for me and I get the following error: "urllib.error.HTTPError: HTTP Error 403: Permission Denied" It's very inconsistent about when I receive the error and sometimes the codes run without a hitch. Does anyone know how to fix this so it runs every time?
Eric O LEBIGOT (EOL)
Good job: it is rare to see bloggers who write good Python code! :) Now, you share the speed up obtained with threads: what do you get with multiprocessing?
mattias
Hi, The part with threading for number of CPUs doesn't make sense. Since it only will run on one CPU core as the python threads are not real os threads there's no relation to the number of available cores.
Yuval Baror
Great article - thanks for sharing! Could you provide some details regarding the run time of the multiprocessing and RQ solutions compared to the original and threaded solutions?
Marcus McCurdy
You can find the full source code of all the examples in the article here https://github.com/volker48/python-concurrency. I never saw any 403 errors when I was testing, but that error would be coming from Imgur's API. I'm not sure what kind of network you are on, but perhaps your IP, if it is shared, is causing you to reach some kind of API limit.
deleteman
I'd also like to see this result.Can you please add it either to the post or here as a reply?
Ravi
Nice article. Surprised not to see Gevent in the list.
bjlange
+1 for RQ! By far the most user-friendly way I've found of distributing work across machines.
Nabeel Valapra
This is what I was looking for.. Great One!!!
Jay Dreyer
Thanks for this! 10x improvement in a script I use regularly. Thanks!
JEdVcM
In your threaded code, you're making a busy-wait with "while True" + non-blocking queue read. This is okay if you've got content in your queue, and you wrap it up at the end, but it gets to be a CPU hog otherwise. We had that a few months ago in our project. The other thing, I surely don't understand python threading totally, but even in an I/O bound environment, how can you get any faster, if you only use sync network reads? The GIL still makes only one thread running at a time. What do I miss? Is there some under-the-hood optimization for that in CPython?
Dannnno
From what I recall, CPython releases the GIL when threads are waiting for IO events - I'm not clear on the specifics of how that is done however.
JEdVcM
That would explain things well. However, I've found nothing on that yet. Another thing: sorry, I was wrong. The code uses a blocking wait.
Dannnno
I can't find where I originally read it either, however you can have this tidbit "Note that potentially blocking or long-running operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL." https://wiki.python.org/moin/GlobalInterpreterLock
Arulbalan Subramani
Hello Marcus, Wonderful post! Thanks, one question: though we have no of threads to be controlled using semaphore class, why do we need queue class to handle threads ? Thanks, Arul
Andreu Vallbona Plazas
In the example of multiprocessing Pool, how you control the same link is not downloaded for every process? On the other hand, GREAT POST!!
Marcus McCurdy
Its the way the map function works with the multiprocessing pool.<pre><code class="python">p.map(download, links)</code></pre> Links is a list of download links. p.map calls the download function once for each link in the list. Because this is the map function on a pool object each function may run in its own process. Pool's map function works similarly to Python's built in map function. Not sure if you are familiar with it, but it might make more sense if you play around with it https://docs.python.org/2/library/functions.html#map.
Mike Zang
This is a great job! I want to use it to my projects. I parse image link one by one dynamically, how can I use your thread or pool way?
sandipan
Celery is a great one to use. It can scale really well. Also it has flexibility of using redis or rabbitmq as broker
ale3andro
Thank you so much for this. My first attempt to write a threaded python script has led to a success!
Sundar Moses
Nicely written Marcus. I am also a python programmer. Though i have used these functions in concurrency, I am unable to rewrite the program using python next time and I forget the flow or parameters. again I refer my old program to recollect the flow. Any tips from your side to remember these function calls and arguments? thanks.
aeroaks
Hi, I have a function that accepts the file path and performs analysis on it. It returns an id for the pandas data frame row to which it was added. The file path is passed as a single string one at a time, from another program. Over time the analysis has included different types of files and takes some time. I need to implement multiprocessing for this part. What would be the best way of doing it? I need the return value of row id to work further on the results.
Will Vaughn
Yeah, your code on github works, but what you have written in the blog does not work
Marcus McCurdy
The code on github has been updated after the blog post was written to account for some changes in the imgur API. I don't have direct access to the blog post like I do with the github repo. Which part of the blog post isn't working for you? I'll check it out and ask for it to get updated once I fix it. Thanks Will.
David Nguyen
great article
Rohit Malgaonkar
Haven't tried redis server but Installed RabbitMQ python server and had 1 producer (sending string data encoded as JSON) and 3 consumers for cpu bound script (looping csv files, searching decoded JSON data sent by producer) and it halfs the time approximately but runs 3 python processes @ 97% and cpu utilization @ 90% versus 1 process @97% and double time.
comments powered by Disqus
Subscribe
The #1 Blog for Engineers
Get the latest content first.
No spam. Just great engineering and design posts.
The #1 Blog for Engineers
Get the latest content first.
Thank you for subscribing!
You can edit your subscription preferences here.
Trending articles
Relevant technologies
About the author
Marcus McCurdy
SQL Developer
Marcus has a Bachelor's in Computer Engineering and a Master's in Computer Science. He is a talented programmer, and excels most at back-end development. However, he is comfortable creating polished products as a full stack developer.