Building a Concurrent Web Scraper with Python and Selenium

Last updated September 4th, 2020

This post looks at how to speed up a Python web scraping and crawling script with multithreading via the concurrent.futures module. We'll also break down the script itself and show how to test the parsing functionality with pytest.

After completing this tutorial you will be able to:

  1. Scrape and crawl websites with Selenium and parse HTML with Beautiful Soup
  2. Set up pytest to test the scraping and parsing functionalities
  3. Execute a web scraper concurrently with the concurrent.futures module
  4. Configure headless mode for ChromeDriver with Selenium

Contents

Project Setup

Clone down the repo if you'd like to follow along. From the command line run the following commands:

$ git clone [email protected]:testdrivenio/concurrent-web-scraping.git
$ cd concurent-web-scraping
$ python3.8 -m venv env
$ source env/bin/activate
(env)$ pip install -r requirements.txt

The above commands may differ depending on your environment.

Install ChromeDriver globally. (We're using version 85.0.4183.87).

Script Overview

The script traverses and scrapes the first 20 pages of Hacker News for information about the current articles listed using Selenium to automate interaction with the site and Beautiful Soup to parse the HTML.

script.py:

import datetime
import sys
from time import sleep, time

from scrapers.scraper import connect_to_base, get_driver, parse_html, write_to_file


def run_process(page_number, filename, browser):
    if connect_to_base(browser, page_number):
        sleep(2)
        html = browser.page_source
        output_list = parse_html(html)
        write_to_file(output_list, filename)
    else:
        print("Error connecting to hacker news")


if __name__ == "__main__":

    # headless mode?
    headless = False
    if len(sys.argv) > 1:
        if sys.argv[1] == "headless":
            print("Running in headless mode")
            headless = True

    # set variables
    start_time = time()
    current_page = 1
    output_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    output_filename = f"output_{output_timestamp}.csv"

    # init browser
    browser = get_driver(headless=headless)

    # scrape and crawl
    while current_page <= 20:
        print(f"Scraping page #{current_page}...")
        run_process(current_page, output_filename, browser)
        current_page = current_page + 1

    # exit
    browser.quit()
    end_time = time()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

Let's start with the main block. After determining whether Chrome should run in headless mode and defining a few variables, the browser is initialized via get_driver() from scrapers/scraper.py:

if __name__ == "__main__":

    # headless mode?
    headless = False
    if len(sys.argv) > 1:
        if sys.argv[1] == "headless":
            print("Running in headless mode")
            headless = True

    # set variables
    start_time = time()
    current_page = 1
    output_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    output_filename = f"output_{output_timestamp}.csv"

    ########
    # here #
    ########
    # init browser
    browser = get_driver(headless=headless)

    # scrape and crawl
    while current_page <= 20:
        print(f"Scraping page #{current_page}...")
        run_process(current_page, output_filename, browser)
        current_page = current_page + 1

    # exit
    browser.quit()
    end_time = time()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

A while loop is then configured to control the flow of the overall scraper.

if __name__ == "__main__":

    # headless mode?
    headless = False
    if len(sys.argv) > 1:
        if sys.argv[1] == "headless":
            print("Running in headless mode")
            headless = True

    # set variables
    start_time = time()
    current_page = 1
    output_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    output_filename = f"output_{output_timestamp}.csv"

    # init browser
    browser = get_driver(headless=headless)

    ########
    # here #
    ########
    # scrape and crawl
    while current_page <= 20:
        print(f"Scraping page #{current_page}...")
        run_process(current_page, output_filename, browser)
        current_page = current_page + 1

    # exit
    browser.quit()
    end_time = time()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

Within the loop, run_process() is called, which manages the WebDriver connection and scraping functions.

def run_process(page_number, filename, browser):
    if connect_to_base(browser, page_number):
        sleep(2)
        html = browser.page_source
        output_list = parse_html(html)
        write_to_file(output_list, filename)
    else:
        print("Error connecting to hacker news")

In run_process(), the browser instance and a page number are passed to connect_to_base().

def run_process(page_number, filename, browser):

    ########
    # here #
    ########
    if connect_to_base(browser, page_number):
        sleep(2)
        html = browser.page_source
        output_list = parse_html(html)
        write_to_file(output_list, filename)
    else:
        print("Error connecting to hacker news")

This function attempts to connect to Hacker News and then uses Selenium's explicit wait functionality to ensure the element with id='hnmain' has loaded before continuing.

def connect_to_base(browser, page_number):
    base_url = f"https://news.ycombinator.com/news?p={page_number}"
    connection_attempts = 0
    while connection_attempts < 3:
        try:
            browser.get(base_url)
            # wait for table element with id = 'hnmain' to load
            # before returning True
            WebDriverWait(browser, 5).until(
                EC.presence_of_element_located((By.ID, "hnmain"))
            )
            return True
        except Exception as e:
            print(e)
            connection_attempts += 1
            print(f"Error connecting to {base_url}.")
            print(f"Attempt #{connection_attempts}.")
    return False

Review the Selenium docs for more information on explicit wait.

To emulate a human user, sleep(2) is called after the browser has connected to Hacker News.

def run_process(page_number, filename, browser):
    if connect_to_base(browser, page_number):

        ########
        # here #
        ########
        sleep(2)
        html = browser.page_source
        output_list = parse_html(html)
        write_to_file(output_list, filename)
    else:
        print("Error connecting to hacker news")

Once the page has loaded and sleep(2) has executed, the browser grabs the HTML source, which is then passed to parse_html().

def run_process(page_number, filename, browser):
    if connect_to_base(browser, page_number):
        sleep(2)

        ########
        # here #
        ########
        html = browser.page_source

        ########
        # here #
        ########
        output_list = parse_html(html)
        write_to_file(output_list, filename)
    else:
        print("Error connecting to hacker news")

parse_html() uses Beautiful Soup to parse the HTML, generating a list of dicts with the appropriate data.

def parse_html(html):
    # create soup object
    soup = BeautifulSoup(html, "html.parser")
    output_list = []
    # parse soup object to get article id, rank, score, and title
    tr_blocks = soup.find_all("tr", class_="athing")
    article = 0
    for tr in tr_blocks:
        article_id = tr.get("id")
        article_url = tr.find_all("a")[1]["href"]
        # check if article is a hacker news article
        if "item?id=" in article_url:
            article_url = f"https://news.ycombinator.com/{article_url}"
        load_time = get_load_time(article_url)
        try:
            score = soup.find(id=f"score_{article_id}").string
        except Exception as e:
            print(e)
            score = "0 points"
        article_info = {
            "id": article_id,
            "load_time": load_time,
            "rank": tr.span.string,
            "score": score,
            "title": tr.find(class_="storylink").string,
            "url": article_url,
        }
        # appends article_info to output_list
        output_list.append(article_info)
        article += 1
    return output_list

This function also passes the article URL to get_load_time(), which loads the URL and records the subsequent load time.

def get_load_time(article_url):
    try:
        # set headers
        headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
        }
        # make get request to article_url
        response = requests.get(
            article_url, headers=headers, stream=True, timeout=3.000
        )
        # get page load time
        load_time = response.elapsed.total_seconds()
    except Exception as e:
        print(e)
        load_time = "Loading Error"
    return load_time

The output is added to a CSV file.

def run_process(page_number, filename, browser):
    if connect_to_base(browser, page_number):
        sleep(2)
        html = browser.page_source
        output_list = parse_html(html)

        ########
        # here #
        ########
        write_to_file(output_list, filename)
    else:
        print("Error connecting to hacker news")

write_to_file():

def write_to_file(output_list, filename):
    for row in output_list:
        with open(filename, "a") as csvfile:
            fieldnames = ["id", "load_time", "rank", "score", "title", "url"]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writerow(row)

Finally, back in the while loop, the page_number is incremented and the process starts over again.

if __name__ == "__main__":

    # headless mode?
    headless = False
    if len(sys.argv) > 1:
        if sys.argv[1] == "headless":
            print("Running in headless mode")
            headless = True

    # set variables
    start_time = time()
    current_page = 1
    output_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    output_filename = f"output_{output_timestamp}.csv"

    # init browser
    browser = get_driver(headless=headless)

    # scrape and crawl
    while current_page <= 20:
        print(f"Scraping page #{current_page}...")
        run_process(current_page, output_filename, browser)

        ########
        # here #
        ########
        current_page = current_page + 1

    # exit
    browser.quit()
    end_time = time()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

Want to test this out? Grab the full script here.

It took about 355 seconds (nearly 6 minutes) to run:

(env)$ python script.py

Scraping page #1...
Scraping page #2...
Scraping page #3...
Scraping page #4...
Scraping page #5...
Scraping page #6...
Scraping page #7...
Scraping page #8...
Scraping page #9...
Scraping page #10...
Scraping page #11...
Scraping page #12...
Scraping page #13...
Scraping page #14...
Scraping page #15...
Scraping page #16...
Scraping page #17...
Scraping page #18...
Scraping page #19...
Scraping page #20...
Elapsed run time: 385.49500608444214 seconds

Keep in mind that there may not be content on all 20 pages, so the elapsed time may differ on your end. This script ran when there was content on 18 pages (530 records).

Got it? Great! Let's add some basic testing.

Testing

To test the parsing functionality without initiating the browser and, thus, making repeated GET requests to Hacker News, you can download the page's HTML (test/test.html) and parse it locally. This can help avoid scenarios where you may get your IP blocked for making too many requests too quickly while writing and testing your parsing functions, as well as saving you time by not needing to fire up a browser every time you run the script.

test/test_scraper.py:

from pathlib import Path

import pytest

from scrapers import scraper

BASE_DIR = Path(__file__).resolve(strict=True).parent


@pytest.fixture(scope="module")
def html_output():
    with open(Path(BASE_DIR).joinpath("test.html"), encoding="utf-8") as f:
        html = f.read()
        yield scraper.parse_html(html)


def test_output_is_not_none(html_output):
    assert html_output


def test_output_is_a_list(html_output):
    assert isinstance(html_output, list)


def test_output_is_a_list_of_dicts(html_output):
    assert all(isinstance(elem, dict) for elem in html_output)

Ensure all is well:

(env)$ python -m pytest test/test_scraper.py

================================ test session starts =================================
platform darwin -- Python 3.8.5, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/michael/repos/testdriven/async-web-scraping
collected 3 items

test/test_scraper.py ...                                                       [100%]

================================= 3 passed in 20.10s =================================

20 seconds?! Want to mock get_load_time() to bypass the GET request?

test/test_scraper_mock.py:

from pathlib import Path

import pytest

from scrapers import scraper

BASE_DIR = Path(__file__).resolve(strict=True).parent


@pytest.fixture(scope="function")
def html_output(monkeypatch):
    def mock_get_load_time(url):
        return "mocked!"

    monkeypatch.setattr(scraper, "get_load_time", mock_get_load_time)
    with open(Path(BASE_DIR).joinpath("test.html"), encoding="utf-8") as f:
        html = f.read()
        yield scraper.parse_html(html)


def test_output_is_not_none(html_output):
    assert html_output


def test_output_is_a_list(html_output):
    assert isinstance(html_output, list)


def test_output_is_a_list_of_dicts(html_output):
    assert all(isinstance(elem, dict) for elem in html_output)

Test:

(env)$ python -m pytest test/test_scraper_mock.py

================================ test session starts =================================
platform darwin -- Python 3.8.5, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/michael/repos/testdriven/async-web-scraping
collected 3 items

test/test_scraper.py ...                                                       [100%]

================================= 3 passed in 0.37s =================================

Configure Multithreading

Now comes the fun part! By making just a few changes to the script, we can speed things up:

import datetime
import sys
from concurrent.futures import ThreadPoolExecutor, wait
from time import sleep, time

from scrapers.scraper import connect_to_base, get_driver, parse_html, write_to_file


def run_process(page_number, filename, headless):

    # init browser
    browser = get_driver(headless)

    if connect_to_base(browser, page_number):
        sleep(2)
        html = browser.page_source
        output_list = parse_html(html)
        write_to_file(output_list, filename)

        # exit
        browser.quit()
    else:
        print("Error connecting to hacker news")
        browser.quit()


if __name__ == "__main__":

    # headless mode?
    headless = False
    if len(sys.argv) > 1:
        if sys.argv[1] == "headless":
            print("Running in headless mode")
            headless = True

    # set variables
    start_time = time()
    output_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    output_filename = f"output_{output_timestamp}.csv"
    futures = []

    # scrape and crawl
    with ThreadPoolExecutor() as executor:
        for number in range(1, 21):
            futures.append(
                executor.submit(run_process, number, output_filename, headless)
            )

    wait(futures)
    end_time = time()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

With the concurrent.futures library, ThreadPoolExecutor is used to spawn a pool of threads for executing the run_process functions asynchronously. The submit method takes the function along with the parameters for that function and returns a future object. wait is then used to block execution until all tasks are complete.

It's worth noting that you can easily switch to multiprocessing via ProcessPoolExecutor since both ProcessPoolExecutor and ThreadPoolExecutor implement the same interface:

# scrape and crawl
with ProcessPoolExecutor() as executor:
    for number in range(1, 21):
        futures.append(
            executor.submit(run_process, number, output_filename, headless)
        )

Why multithreading instead of multiprocessing?

Web scraping is I/O bound since the retrieving of the HTML (I/O) is slower than parsing it (CPU). For more on this along with the difference between parallelism (multiprocessing) and concurrency (multithreading), review the Speeding Up Python with Concurrency, Parallelism, and asyncio post.

Run:

(env)$ python script_concurrent.py

Elapsed run time: 38.73605298995972 seconds

Check out the completed script here.

To speed things up even further we can run Chrome in headless mode by passing in the headless command line argument:

(env)$ python script_concurrent.py headless

Running in headless mode

Elapsed run time: 35.12011382590508 seconds

Conclusion

With a small amount of variation from the original code, we were able to execute the web scraper concurrently to take the script's run time from around 385 seconds to just over 35 seconds. In this specific scenario that's 90% faster, which is a huge improvement.

I hope this helps your scripts. You can find the code in the repo. Cheers!

Caleb Pollman

Caleb Pollman

Caleb is a software developer with a background in fine art and design. He's excited to learn new things and is most comfortable in challenging environments. In his free time he creates art and hangs out with random cats.

Share this tutorial

Featured Course

Building Your Own Python Web Framework

In this course, you'll learn how to develop your own Python web framework to see how all the magic works beneath the scenes in Flask, Django, and the other Python-based web frameworks.

Featured Course

Building Your Own Python Web Framework

In this course, you'll learn how to develop your own Python web framework to see how all the magic works beneath the scenes in Flask, Django, and the other Python-based web frameworks.