How to Turn Web Scrapers into Data APIs

How to Turn Web Scrapers into Data APIs

Delivering web scraped data in real-time can be a real challenge. Though to turn a Python web scraper into a data-on-demand API we need very few ingredients.

In this practical tutorial, we'll build a simple Yahoo Finance scraper and wrap it around in a web API powered by FastAPI to scrape and deliver thousands of stock data details on demand!

Why FastAPI?

FastAPI is an asynchronous API framework in Python which makes it ideal for a web scraping API for this very reason. By employing the async code structure we can have really fast programs with very little overhead in very little code. In this article our full yahoo stock data API with caching and webhook support will be done in less than 100 lines of code!

Web Scraping Speed: Processes, Threads and Async

For more on asynchronous web scraping, what it is and how does it work, see our introduction article on web scraper scaling

Web Scraping Speed: Processes, Threads and Async

Setup

In this tutorial, as our API we'll be using FastAPI as our API server. To server our API we'll be using uvicorn and to understand what's going on easier we'll be using a rich logging package loguru.

For scraping, we'll be using httpx as our HTTP client and parsel as our HTML parser.

Hands on Python Web Scraping Tutorial and Example Project

If you're completely new to web scraping with these two Python tools, have a look out our introduction tutorial.

Hands on Python Web Scraping Tutorial and Example Project

All of these tools are Python packages and can be installed using the pip console command:

$ pip install fastapi uvicorn httpx parsel loguru

With our tools ready let's take a look at FastAPI basics.

FastAPI Quick Start

FastAPI has some amazing documentation resources but for our scraper service, we only need the very basics.

To start we'll be working in a single python module main.py:

# main.py
from fastapi import FastAPI
from loguru import logger as log

# create API app object
app = FastAPI()

# attach route to our API app
@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str):
    return {
        "stock": symbol,
        "price": "placeholder", 
    }

Above, we definite the first details of our data API with a single route: /scrape/stock/<symbol>.
This route has a single parameter - symbol - that indicates what stock should be scraped. For example, Apple's stock's symbol is AAPL.

We can run our API and see our placeholder results:

$ uvicorn main:app --reload

Now, if we go to http://127.0.0.1:8000/scrape/stock/aapl we should see our placeholder results as JSON:

import httpx

print(httpx.get("http://127.0.0.1:8000/scrape/stock/aapl").json())
{
    "stock": "AAPL",
    "price": "placeholder",
}

So, every time someone connects to this API endpoint our scrape_stock function will be called and the results will be returned to the client. Now, let's make it scrape stuff.

Scraping Yahoo Finance

To scrape Yahoo Finance we'll generate the URL that leads to the stock data page, download the HTML data and parse it using a few clever XPath selectors.

For example, let's take a look at Apple's stock data page yahoo.com/quote/AAPL/ - we can see that the URL pattern is finance.yahoo.com/quote/<symbol>. So, as long as we know the company's stock exchange symbol we can scrape any stock's data.

screencapture of yahoo finance page for Apple stock
We'll scrape price and summary details

Let's add the scraper logic to our API:

import asyncio
import httpx
from time import time
from fastapi import FastAPI

app = FastAPI()
stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0))

async def scrape_yahoo_finance(symbol):
    """scrapes stock data from yahoo finance"""
    log.info(f"{symbol}: scraping data")
    response = await stock_client.get(
        f"https://finance.yahoo.com/quote/{symbol}?p={symbol}"
    )
    sel = Selector(response.text)
    parsed = {}

    # parse summary data tables:
    rows = sel.xpath('//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]')
    for row in rows:
        label = row.xpath("@data-test").get().split("-value")[0].lower()
        value = " ".join(row.xpath(".//text()").getall())
        parsed[label] = value
    # parse price:
    parsed["price"] = sel.css(
        f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
    ).get()
    # add meta field for when this was scraped
    parsed["_scraped_on"] = time()
    return parsed

@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str):
    symbol = symbol.upper()
    return await scrape_yahoo_finance(symbol)

# on API start - open up our scraper's http client connections
@app.on_event("startup")
async def app_startup():
    await stock_client.__aenter__()


# on API close - close our scraper's http client connections
@app.on_event("shutdown")
async def app_shutdown():
    await stock_client.__aexit__()

We've updated our API with a scraper function and attached it to our API endpoint. Let's take a look at the results now:

$ curl http://127.0.0.1:8000/scrape/stock/aapl
{
  "prev_close": "156.90",
  "open": "157.34",
  "bid": "0.00 x 1000",
  "ask": "0.00 x 1000",
  "days_range": "153.67 - 158.74",
  "fifty_two_wk_range": "129.04 - 182.94",
  "td_volume": "101,696,790",
  "average_volume_3month": "75,324,652",
  "market_cap": "2.47T",
  "beta_5y": "1.23",
  "pe_ratio": "25.41",
  "eps_ratio": "6.05",
  "earnings_date": "Oct 26, 2022  -  Oct 31, 2022",
  "dividend_and_yield": "0.92 (0.59%)",
  "ex_dividend_date": "Aug 05, 2022",
  "one_year_target_price": "182.01",
  "price": "153.72",
  "_scraped_on": 1663838493.6148243
}

We've turned our scraper into a data API in just a few lines of code though we do have one glaring problem: API load.
What if multiple clients requests scraping of the same stock within a short amount of time? We'll be wasting our resources on redundant scrapes.
To fix this, let's take a look at how we can add simple caching.

Caching Scrapers

There are many ways to cache API contents, from enabling caching in our web server to using dedicated caching tools like Redis or Memchache.

However, for small API projects we can easily handle this within Python ourselves with very few modifications:

import asyncio
import httpx
from time import time
from fastapi import FastAPI

app = FastAPI()
stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0))
STOCK_CACHE = {}  # NEW: establish global cache storage
CACHE_TIME = 60  # NEW: define how long do we want to keep cache in seconds

async def scrape_yahoo_finance(symbol):
    """scrapes stock data from yahoo finance"""
    # NEW: check cache before we commit to scraping
    cache = STOCK_CACHE.get(symbol)
    if cache and time() - CACHE_TIME < cache["_scraped_on"]:
        log.debug(f"{symbol}: returning cached item")
        return cache

    log.info(f"{symbol}: scraping data")
    response = await stock_client.get(
        f"https://finance.yahoo.com/quote/{symbol}?p={symbol}"
    )
    sel = Selector(response.text)
    parsed = {}

    rows = sel.xpath('//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]')
    for row in rows:
        label = row.xpath("@data-test").get().split("-value")[0].lower()
        value = " ".join(row.xpath(".//text()").getall())
        parsed[label] = value
    parsed["price"] = sel.css(
        f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
    ).get()
    parsed["_scraped_on"] = time()
    # NEW: store successful results to cache
    STOCK_CACHE[symbol] = parsed
    return parsed

@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str):
    symbol = symbol.upper()
    return await scrape_yahoo_finance(symbol)

@app.on_event("startup")
async def app_startup():
    await stock_client.__aenter__()
    # NEW: optionally we can clear expired cache every minute to prevent
    # memory build up. 
    async def clear_expired_cache(period=60.0):
        while True:
            global STOCK_CACHE
            log.debug(f"clearing expired cache")
            STOCK_CACHE = {
                k: v for k, v in STOCK_CACHE.items() if time() - CACHE_TIME < v["_scraped_on"]
            }
            await asyncio.sleep(period)
    clear_cache_task = asyncio.create_task(clear_expired_cache())


@app.on_event("shutdown")
async def app_shutdown():
    await stock_client.__aexit__()

Above, we've updated our scrape function to first check the global cache before scraping the live target. For this, we use a simple Python dictionary to store scraped data by the symbol.

illustration of cache usage
Instead of scraping on demand the API can pick results that were generated recently

Python dictionaries are extremely fast and efficient so we can cache thousands or even millions of results in the memory of our API.
We also added a repeating asyncio task clear_expired_cache() which deletes expired cache values every minute. We did this by using asyncio.create_task() function which turns any asynchronous function (coroutine) into a background task object.

Now, if our API gets requests for the same stock data multiple times it'll only scrape the data once and serve cache to everyone else:

import asyncio
import httpx
from time import time


async def many_concurrent_api_calls(n):
    _start = time()
    async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client:
        _start_one = time()
        await client.get("http://127.0.0.1:8000/scrape/stock/aapl")
        print(f"completed first API scrape in {time() - _start_one:.2f} seconds")
        results = await asyncio.gather(*[
            client.get("http://127.0.0.1:8000/scrape/stock/aapl")
            for i in range(n)
        ])
    print(f"completed {n-1} API scrapes in {time() - _start:.2f} seconds")

# run 1000 API calls
asyncio.run(many_concurrent_api_calls(1_000))
# will print:
# completed first API scrape in 1.23 seconds
# completed 999 API scrapes in 2.59 seconds

Caching techniques will vary by application but for long and expensive tasks like web scraping, we can see great benefits even from simple designs like this one.

For more advanced caching in FastAPI see fastapi-cache extension.

Webhooks for Long Scrapes

Some scrape tasks can take many seconds or even minutes to complete which would timeout or block connecting client. One way to handle this is to provide a webhook support.

In other words, webhooks essentially promise that the API will complete the task and call back with the results later.

illustration of webhook usage
Instead of returning data directly the API sends it to the webhook

We can extend our API to include this feature by replacing our scrape_stock route:

...

async def with_webhook(cor, webhook, retries=3):
    """execute corotine and send it to a webhook"""
    result = await cor
    async with httpx.AsyncClient(
        headers={"User-Agent": "scraper webhook"},
        timeout=httpx.Timeout(timeout=15.0),
    ) as client:
        for i in range(retries):
            try:
                response = await client.post(webhook, json=result)
                return
            except Exception as e:
                log.exception(f"Failed to send a webhook {i}/{retries}")
            await asyncio.sleep(5)  # wait between retries
        log.error(f"Failed to reach webhook in {retries} retries")



@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str, webhook: Optional[str] = None):
    symbol = symbol.upper()
    scrape_cor = scrape_yahoo_finance(symbol)
    if webhook:
        # run scrape coroutine in the background
        task = asyncio.create_task(with_webhook(scrape_cor, webhook))
        return {"success": True, "webhook": webhook}
    else:
        return await scrape_cor

In the update above, if ?webhook parameter is supplied to our API it'll instantly return {"success": True, "webhook": webook} and schedule a background task that'll scrape results and post them to the clients webhook address.

To test this out we can webhook testing service such as webhook.site:

from urllib.parse import urlencode
import httpx

url = "http://127.0.0.1:8000/scrape/stock/aapl" + urlencode({
    "webhook": "https://webhook.site/da0a5cbc-0a62-4bd5-90f4-c8af73e5df5c"  # change this to yours
})
response = httpx.get(url)
print(response.json())
# {'success': True, 'webhook': 'https://webhook.site/da0a5cbc-0a62-4bd5-90f4-c8af73e5df5c'}

When running this, we'll see API returning instantly and if we check our webhooks.site session we'll see the data delivered there:

screencapture of webhook.site after our webhook is sent
We can see incoming webhook with the scraped data

Webhooks are a great way to not only deal with long scraping tasks but also ensure that our API will scale in the future as we can simply start more processes to handle webhook tasks if needed.

Final Scraper Code

To wrap this article up let's take a look at entire code of our scraper API:

import asyncio
from time import time
from typing import Optional

import httpx
from fastapi import FastAPI
from loguru import logger as log
from parsel import Selector

app = FastAPI()

stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0))
STOCK_CACHE = {}
CACHE_TIME = 10


async def scrape_yahoo_finance(symbol):
    """scrapes stock data from yahoo finance"""
    cache = STOCK_CACHE.get(symbol)
    if cache and time() - CACHE_TIME < cache["_scraped_on"]:
        log.debug(f"{symbol}: returning cached item")
        return cache

    log.info(f"{symbol}: scraping data")
    response = await stock_client.get(
        f"https://finance.yahoo.com/quote/{symbol}?p={symbol}"
    )
    sel = Selector(response.text)
    parsed = {}
    for row in sel.xpath(
        '//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]'
    ):
        label = row.xpath("@data-test").get().split("-value")[0].lower()
        value = " ".join(row.xpath(".//text()").getall())
        parsed[label] = value
    parsed["price"] = sel.css(
        f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
    ).get()
    parsed["_scraped_on"] = time()
    STOCK_CACHE[symbol] = parsed
    return parsed


async def with_webhook(cor, webhook, retries=3):
    result = await cor
    async with httpx.AsyncClient(
        headers={"User-Agent": "scraper webhook"},
        timeout=httpx.Timeout(timeout=15.0),
    ) as client:
        for i in range(retries):
            try:
                response = await client.post(webhook, json=result)
                return
            except Exception as e:
                log.exception(f"Failed to send a webhook {i}/{retries}")
            await asyncio.sleep(5)  # wait between retries


@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str, webhook: Optional[str] = None):
    symbol = symbol.upper()
    scrape_cor = scrape_yahoo_finance(symbol)
    if webhook:
        # run scrape coroutine in the background
        task = asyncio.create_task(with_webhook(scrape_cor, webhook))
        return {"success": True, "webhook": webhook}
    else:
        return await scrape_cor


# we can clear cache every once in a while to prevent memory build up
# @app.on_event("startup")
# @repeat_every(seconds=5.0)  # 1 hour
async def clear_expired_cache(period=5.0):
    while True:
        global STOCK_CACHE
        _initial_len = len(STOCK_CACHE)
        log.debug(f"clearing expired cache, current len {_initial_len}")
        STOCK_CACHE = {
            k: v
            for k, v in STOCK_CACHE.items()
            if v["_scraped_on"] == "scraping" or time() - CACHE_TIME < v["_scraped_on"]
        }
        log.debug(f"cleared {_initial_len - len(STOCK_CACHE)}")
        await asyncio.sleep(period)


@app.on_event("startup")
async def app_startup():
    await stock_client.__aenter__()
    clear_cache_task = asyncio.create_task(clear_expired_cache())


@app.on_event("shutdown")
async def app_shutdown():
    await stock_client.__aexit__()

ScrapFly

Web scraper logic can quickly outgrow our API. If we're scraping difficult targets that ban scrapers and require complex connections and proxy usage soon our data API will do more scraping than API'ing. We can abstract all of these complexities away to ScrapFly!

scrapfly middleware

ScrapFly provides web scraping, screenshot, and extraction APIs for data collection at scale.

For example, we could replace our scraper code to use ScrapFly through python sdk:

...

from scrapfly import ScrapflyClient, ScrapeConfig
stock_client = ScrapflyClient(key="YOUR SCRAPFLY KEY")

async def scrape_yahoo_finance(symbol):
    """scrapes stock data from yahoo finance"""
    cache = STOCK_CACHE.get(symbol)
    if cache and time() - CACHE_TIME < cache["_scraped_on"]:
        log.debug(f"{symbol}: returning cached item")
        return cache

    log.info(f"{symbol}: scraping data")
    result = await stock_client.async_scrape(ScrapeConfig(
        url=f"https://finance.yahoo.com/quote/{symbol}?p={symbol}",
        # we can select proxy country
        country="US",
        # or proxy type
        proxy_pool="public_residential_pool",
        # enable anti scraping protection bypass
        asp=True,
        # or real headless browser javascript rendering
        render_js=True,
        # we can enable caching
        cache=True,
    ))
    parsed = {}
    for row in result.selector.xpath(
        '//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]'
    ):
        label = row.xpath("@data-test").get().split("-value")[0].lower()
        value = " ".join(row.xpath(".//text()").getall())
        parsed[label] = value
    parsed["price"] = result.selector.css(
        f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
    ).get()
    parsed["_scraped_on"] = time()
    STOCK_CACHE[symbol] = parsed
    return parsed

...

Now our data API can scrape content in real-time without being blocked or throttled.

ScrapFly also offers built-in cache and webhook functionalities - just like we've covered in this tutorial which makes it an easy integration to real time scraping APIs.

Summary

In this practical tutorial, we've taken a quick dive into data APIs that scrape data in real time. We've covered how to set up FastAPI, wrote up a simple Yahoo Finance stock data scraper then wrapped everything together into a single, cohesive data API with caching and webhook support.

We've done that in just a few lines of actual code by taking advantage of existing, powerful web API ecosystem in Python!

Related Posts

How to Capture and Convert a Screenshot to PDF

Quick guide on how to effectively capture web screenshots as PDF documents

Playwright Examples for Web Scraping and Automation

Learn Playwright with Python and JavaScript examples for automating browsers like Chromium, WebKit, and Firefox.

How to use wget in Python

Learn how to use wget in Python through subprocess calls and what are other options.