We all know Python has the ability to run functions concurrently and in parallel but if you aren’t doing it frequently, you might be fuzzy on the details. This code demonstrates how to use the ThreadPoolExecutor
and ProcessPoolExecutor
classes from the concurrent.futures
module to run multiple threads and processes concurrently or in parallel to save you time.
You can find the code for ThreadPoolExecutor
on this canvas.
First, we’ll import all of the packages and modules that we need for this exercise. Note that you do not need to install multiprocessing
or time
as they are built into Python. Running %pip install
on either of these packages will return an error.
# Install futures and requests package
%pip install futures
%pip install requests
# Import relevant packages
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing
import time
import requests
ThreadPoolExecutor
can often be used for HTTP requests. For our example, we’ll define a function called get_imdb_page_status()
which will retrieve the HTTP status of a given URL. For comparison's sake, we'll use the same example for ProcessPoolExecutor
later as well.
# Define function that checks the page status of a given URL
def get_imdb_page_status(url, headers, timeout = 5):
response = requests.get(url = url, headers = headers, timeout = timeout)
if response.status_code == 200:
page_status = "200: exists"
elif response.status_code == 404:
page_status = "404: does not exist"
else:
page_status = "UNKNOWN"
return url + " - " + page_status
Then, we’ll create a list of URLs we’re interested in. IMDB stores all entries using specific ID codes. We’ll check on some title entries (movies and TV shows rather than actors or actresses), using the ID template tt
followed by 7 digits. We have prefilled the first 5 digits as 00000
, and will use a list comprehension to append 2 more digits at the end. We’ll also add some header information, which you can find from your browser. For Chrome, try View > Developer Tools > Console > Network conditions.
# Create list of IMDB URLs
imdb_movie_urls = ["https://www.imdb.com/title/tt00000" + str(i) for i in range(10,50,1)]
# Get User-Agent information--you can find this via View > Developer Tools > Console > Network conditions
headers = {"User-Agent": "XXXXXXX"}
Note: IMDB has an API that you can use to access more detailed information about movies and shows.
Then we’ll use a for loop, ThreadPoolExecutor
, and ProcessPoolExecutor
to check the status of each webpage. The time()
function from the time
module allows us to compare time elapsed for each method. There are different ways to get the executors to perform tasks and return results, namely the executor.submit()
and executor.map()
functions. For simplicity, this post will focus on executor.submit()
, which will execute each task and return a futures object. Then you can call on each futures object with the result()
function to return the results.
Using a for loop
# Use for loop to check status of each webpage
print("Running without threads:")
start_s = time.time()
for url in imdb_movie_urls:
print(get_imdb_page_status(url, headers))
end_s = time.time()
print(f'Completed in {round((end_s-start_s)*1000, 2)} milliseconds.')
Output:
Running without threads:
https://www.imdb.com/title/tt0000010 - 200: exists
https://www.imdb.com/title/tt0000011 - 200: exists
...
https://www.imdb.com/title/tt0000048 - 200: exists
https://www.imdb.com/title/tt0000049 - 200: exists
Completed in 27702.04 milliseconds.
We can see that the process took 27,702.04 milliseconds to complete. Now we’ll try the same process with ThreadPoolExecutor
.
Using ThreadPoolExecutor
print(f'Running with {multiprocessing.cpu_count()} threads:')
start_s = time.time()
# Create ThreadPoolExecutor() instance with max_workers equal to CPUs in the system
with ThreadPoolExecutor(max_workers = multiprocessing.cpu_count()) as executor:
# Submit tasks to executor using executor.submit()
futures = [executor.submit(get_imdb_page_status, url, headers) for url in imdb_movie_urls]
# Fetch results as tasks are completed
for future in concurrent.futures.as_completed(futures):
print(future.result())
end_s = time.time()
print(f'Completed in {round((end_s-start_s)*1000, 2)} milliseconds.')
Output:
Running with 16 threads:
https://www.imdb.com/title/tt0000019 - 200: exists
https://www.imdb.com/title/tt0000015 - 200: exists
...
https://www.imdb.com/title/tt0000048 - 200: exists
https://www.imdb.com/title/tt0000047 - 200: exists
Completed in 2109.58 milliseconds.
As we can see, the process was more than 10 times faster at 2,109.58 milliseconds. Additionally, the tasks did not return in order. The print statements started with tt0000019
and ended with tt0000047
.
BONUS: ThreadPoolExecutor with Generative AI
If you’re interested in optimizing your workflows, check out Einblick’s AI agent, Prompt. Now all Einblick users have access to Prompt, which can create data workflows from as little as one sentence. Prompt is powered by OpenAI and LangChain, but tailored for the data domain, including generative and fixing Python code, creating charts, and building models. Then, you can run the code immediately in our AI-native data notebooks. Check out how we used Prompt for this case below:
Using Generative AI in Einblick
- Open your Einblick canvas
- Select the Python cell with the functions you want to optimize > Select Prompt
- Type in: "Time the use of ThreadPoolExecutor to run get_imdb_page_status function on imdb_movie_urls."
- Select the Python cell > Shift + Enter to run the code
Test out different prompts, and get results in seconds!
Use ProcessPoolExecutor
The syntax of the code for ProcessPoolExecutor
is almost identical to that for the ThreadPoolExecutor
. The main difference is that ProcessPoolExecutor
needs access to the __main__
module. For ease of use, we wrote a Python script called parallel_processing.py
, and ran it via the command line or terminal.
def main():
# Create list of URLs
imdb_movie_urls = ["https://www.imdb.com/title/tt00000" + str(i) for i in range(10,50,1)]
headers = {"User-Agent": "XXXXXXX"}
print(f'Running with {multiprocessing.cpu_count()} processes:')
start_s = time.time()
# Create ProcessPoolExecutor() instance with max_workers equal to CPUs in the system
with ProcessPoolExecutor(max_workers = multiprocessing.cpu_count()) as executor:
# Submit tasks to executor using executor.submit()
futures = [executor.submit(get_imdb_page_status, url, headers) for url in imdb_movie_urls]
# Fetch results as tasks are completed
for future in concurrent.futures.as_completed(futures):
print(future.result())
end_s = time.time()
print(f'Completed in {round((end_s-start_s)*1000, 2)} milliseconds.')
if __name__ == "__main__":
main()
(base) eimblick@MacBook-Pro concurrent % python parallel_processing.py
Running with 8 processes:
https://www.imdb.com/title/tt0000017 - 200: exists
https://www.imdb.com/title/tt0000012 - 200: exists
...
https://www.imdb.com/title/tt0000049 - 200: exists
https://www.imdb.com/title/tt0000048 - 200: exists
Completed in 5042.42 milliseconds.
This process took 5,042.42 milliseconds.
Threading vs. multiprocessing
Threading and multiprocessing are different and have different use-cases.
Threads:
- Share memory and can access or modify the same variables
- Can communicate with each other
- In Python, two threads in the same program cannot be executed at the same time
- Efficient for I/O-bound tasks, like reading or writing to an API
Processes:
- Have their own memory, and more overhead than threads
- More efficient for CPU-bound tasks, like heavy computing
About
Einblick is an AI-native data science platform that provides data teams with an agile workflow to swiftly explore data, build predictive models, and deploy data apps. Founded in 2020, Einblick was developed based on six years of research at MIT and Brown University. Einblick is funded by Amplify Partners, Flybridge, Samsung Next, Dell Technologies Capital, and Intel Capital. For more information, please visit www.einblick.ai and follow us on LinkedIn and Twitter.