Delivering web scraped data in real-time can be a real challenge. Though to turn a Python web scraper into a data-on-demand API we need very few ingredients.
In this practical tutorial, we'll build a simple Yahoo Finance scraper and wrap it around in a web API powered by FastAPI to scrape and deliver thousands of stock data details on demand!
Why FastAPI?
FastAPI is an asynchronous API framework in Python which makes it ideal for a web scraping API for this very reason. By employing the async code structure we can have really fast programs with very little overhead in very little code. In this article our full yahoo stock data API with caching and webhook support will be done in less than 100 lines of code!
Setup
In this tutorial, as our API we'll be using FastAPI as our API server. To server our API we'll be using uvicorn and to understand what's going on easier we'll be using a rich logging package loguru.
For scraping, we'll be using httpx as our HTTP client and parsel as our HTML parser.
All of these tools are Python packages and can be installed using the pip console command:
$ pip install fastapi uvicorn httpx parsel loguru
With our tools ready let's take a look at FastAPI basics.
FastAPI Quick Start
FastAPI has some amazing documentation resources but for our scraper service, we only need the very basics.
To start we'll be working in a single python module main.py:
# main.py
from fastapi import FastAPI
from loguru import logger as log
# create API app object
app = FastAPI()
# attach route to our API app
@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str):
return {
"stock": symbol,
"price": "placeholder",
}
Above, we definite the first details of our data API with a single route: /scrape/stock/<symbol>.
This route has a single parameter - symbol - that indicates what stock should be scraped. For example, Apple's stock's symbol is AAPL.
We can run our API and see our placeholder results:
$ uvicorn main:app --reload
Now, if we go to http://127.0.0.1:8000/scrape/stock/aapl we should see our placeholder results as JSON:
So, every time someone connects to this API endpoint our scrape_stock function will be called and the results will be returned to the client. Now, let's make it scrape stuff.
Scraping Yahoo Finance
To scrape Yahoo Finance we'll generate the URL that leads to the stock data page, download the HTML data and parse it using a few clever XPath selectors.
For example, let's take a look at Apple's stock data page yahoo.com/quote/AAPL/ - we can see that the URL pattern is finance.yahoo.com/quote/<symbol>. So, as long as we know the company's stock exchange symbol we can scrape any stock's data.
Let's add the scraper logic to our API:
import asyncio
import httpx
from time import time
from fastapi import FastAPI
app = FastAPI()
stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0))
async def scrape_yahoo_finance(symbol):
"""scrapes stock data from yahoo finance"""
log.info(f"{symbol}: scraping data")
response = await stock_client.get(
f"https://finance.yahoo.com/quote/{symbol}?p={symbol}"
)
sel = Selector(response.text)
parsed = {}
# parse summary data tables:
rows = sel.xpath('//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]')
for row in rows:
label = row.xpath("@data-test").get().split("-value")[0].lower()
value = " ".join(row.xpath(".//text()").getall())
parsed[label] = value
# parse price:
parsed["price"] = sel.css(
f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
).get()
# add meta field for when this was scraped
parsed["_scraped_on"] = time()
return parsed
@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str):
symbol = symbol.upper()
return await scrape_yahoo_finance(symbol)
# on API start - open up our scraper's http client connections
@app.on_event("startup")
async def app_startup():
await stock_client.__aenter__()
# on API close - close our scraper's http client connections
@app.on_event("shutdown")
async def app_shutdown():
await stock_client.__aexit__()
We've updated our API with a scraper function and attached it to our API endpoint. Let's take a look at the results now:
We've turned our scraper into a data API in just a few lines of code though we do have one glaring problem: API load.
What if multiple clients requests scraping of the same stock within a short amount of time? We'll be wasting our resources on redundant scrapes.
To fix this, let's take a look at how we can add simple caching.
Caching Scrapers
There are many ways to cache API contents, from enabling caching in our web server to using dedicated caching tools like Redis or Memchache.
However, for small API projects we can easily handle this within Python ourselves with very few modifications:
import asyncio
import httpx
from time import time
from fastapi import FastAPI
app = FastAPI()
stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0))
STOCK_CACHE = {} # NEW: establish global cache storage
CACHE_TIME = 60 # NEW: define how long do we want to keep cache in seconds
async def scrape_yahoo_finance(symbol):
"""scrapes stock data from yahoo finance"""
# NEW: check cache before we commit to scraping
cache = STOCK_CACHE.get(symbol)
if cache and time() - CACHE_TIME < cache["_scraped_on"]:
log.debug(f"{symbol}: returning cached item")
return cache
log.info(f"{symbol}: scraping data")
response = await stock_client.get(
f"https://finance.yahoo.com/quote/{symbol}?p={symbol}"
)
sel = Selector(response.text)
parsed = {}
rows = sel.xpath('//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]')
for row in rows:
label = row.xpath("@data-test").get().split("-value")[0].lower()
value = " ".join(row.xpath(".//text()").getall())
parsed[label] = value
parsed["price"] = sel.css(
f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
).get()
parsed["_scraped_on"] = time()
# NEW: store successful results to cache
STOCK_CACHE[symbol] = parsed
return parsed
@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str):
symbol = symbol.upper()
return await scrape_yahoo_finance(symbol)
@app.on_event("startup")
async def app_startup():
await stock_client.__aenter__()
# NEW: optionally we can clear expired cache every minute to prevent
# memory build up.
async def clear_expired_cache(period=60.0):
while True:
global STOCK_CACHE
log.debug(f"clearing expired cache")
STOCK_CACHE = {
k: v for k, v in STOCK_CACHE.items() if time() - CACHE_TIME < v["_scraped_on"]
}
await asyncio.sleep(period)
clear_cache_task = asyncio.create_task(clear_expired_cache())
@app.on_event("shutdown")
async def app_shutdown():
await stock_client.__aexit__()
Above, we've updated our scrape function to first check the global cache before scraping the live target. For this, we use a simple Python dictionary to store scraped data by the symbol.
Python dictionaries are extremely fast and efficient so we can cache thousands or even millions of results in the memory of our API.
We also added a repeating asyncio task clear_expired_cache() which deletes expired cache values every minute. We did this by using asyncio.create_task() function which turns any asynchronous function (coroutine) into a background task object.
Now, if our API gets requests for the same stock data multiple times it'll only scrape the data once and serve cache to everyone else:
import asyncio
import httpx
from time import time
async def many_concurrent_api_calls(n):
_start = time()
async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client:
_start_one = time()
await client.get("http://127.0.0.1:8000/scrape/stock/aapl")
print(f"completed first API scrape in {time() - _start_one:.2f} seconds")
results = await asyncio.gather(*[
client.get("http://127.0.0.1:8000/scrape/stock/aapl")
for i in range(n)
])
print(f"completed {n-1} API scrapes in {time() - _start:.2f} seconds")
# run 1000 API calls
asyncio.run(many_concurrent_api_calls(1_000))
# will print:
# completed first API scrape in 1.23 seconds
# completed 999 API scrapes in 2.59 seconds
Caching techniques will vary by application but for long and expensive tasks like web scraping, we can see great benefits even from simple designs like this one.
For more advanced caching in FastAPI see fastapi-cache extension.
Webhooks for Long Scrapes
Some scrape tasks can take many seconds or even minutes to complete which would timeout or block connecting client. One way to handle this is to provide a webhook support.
In other words, webhooks essentially promise that the API will complete the task and call back with the results later.
We can extend our API to include this feature by replacing our scrape_stock route:
...
async def with_webhook(cor, webhook, retries=3):
"""execute corotine and send it to a webhook"""
result = await cor
async with httpx.AsyncClient(
headers={"User-Agent": "scraper webhook"},
timeout=httpx.Timeout(timeout=15.0),
) as client:
for i in range(retries):
try:
response = await client.post(webhook, json=result)
return
except Exception as e:
log.exception(f"Failed to send a webhook {i}/{retries}")
await asyncio.sleep(5) # wait between retries
log.error(f"Failed to reach webhook in {retries} retries")
@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str, webhook: Optional[str] = None):
symbol = symbol.upper()
scrape_cor = scrape_yahoo_finance(symbol)
if webhook:
# run scrape coroutine in the background
task = asyncio.create_task(with_webhook(scrape_cor, webhook))
return {"success": True, "webhook": webhook}
else:
return await scrape_cor
In the update above, if ?webhook parameter is supplied to our API it'll instantly return {"success": True, "webhook": webook} and schedule a background task that'll scrape results and post them to the clients webhook address.
To test this out we can webhook testing service such as webhook.site:
from urllib.parse import urlencode
import httpx
url = "http://127.0.0.1:8000/scrape/stock/aapl" + urlencode({
"webhook": "https://webhook.site/da0a5cbc-0a62-4bd5-90f4-c8af73e5df5c" # change this to yours
})
response = httpx.get(url)
print(response.json())
# {'success': True, 'webhook': 'https://webhook.site/da0a5cbc-0a62-4bd5-90f4-c8af73e5df5c'}
When running this, we'll see API returning instantly and if we check our webhooks.site session we'll see the data delivered there:
Webhooks are a great way to not only deal with long scraping tasks but also ensure that our API will scale in the future as we can simply start more processes to handle webhook tasks if needed.
Final Scraper Code
To wrap this article up let's take a look at entire code of our scraper API:
import asyncio
from time import time
from typing import Optional
import httpx
from fastapi import FastAPI
from loguru import logger as log
from parsel import Selector
app = FastAPI()
stock_client = httpx.AsyncClient(timeout=httpx.Timeout(10.0))
STOCK_CACHE = {}
CACHE_TIME = 10
async def scrape_yahoo_finance(symbol):
"""scrapes stock data from yahoo finance"""
cache = STOCK_CACHE.get(symbol)
if cache and time() - CACHE_TIME < cache["_scraped_on"]:
log.debug(f"{symbol}: returning cached item")
return cache
log.info(f"{symbol}: scraping data")
response = await stock_client.get(
f"https://finance.yahoo.com/quote/{symbol}?p={symbol}"
)
sel = Selector(response.text)
parsed = {}
for row in sel.xpath(
'//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]'
):
label = row.xpath("@data-test").get().split("-value")[0].lower()
value = " ".join(row.xpath(".//text()").getall())
parsed[label] = value
parsed["price"] = sel.css(
f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
).get()
parsed["_scraped_on"] = time()
STOCK_CACHE[symbol] = parsed
return parsed
async def with_webhook(cor, webhook, retries=3):
result = await cor
async with httpx.AsyncClient(
headers={"User-Agent": "scraper webhook"},
timeout=httpx.Timeout(timeout=15.0),
) as client:
for i in range(retries):
try:
response = await client.post(webhook, json=result)
return
except Exception as e:
log.exception(f"Failed to send a webhook {i}/{retries}")
await asyncio.sleep(5) # wait between retries
@app.get("/scrape/stock/{symbol}")
async def scrape_stock(symbol: str, webhook: Optional[str] = None):
symbol = symbol.upper()
scrape_cor = scrape_yahoo_finance(symbol)
if webhook:
# run scrape coroutine in the background
task = asyncio.create_task(with_webhook(scrape_cor, webhook))
return {"success": True, "webhook": webhook}
else:
return await scrape_cor
# we can clear cache every once in a while to prevent memory build up
# @app.on_event("startup")
# @repeat_every(seconds=5.0) # 1 hour
async def clear_expired_cache(period=5.0):
while True:
global STOCK_CACHE
_initial_len = len(STOCK_CACHE)
log.debug(f"clearing expired cache, current len {_initial_len}")
STOCK_CACHE = {
k: v
for k, v in STOCK_CACHE.items()
if v["_scraped_on"] == "scraping" or time() - CACHE_TIME < v["_scraped_on"]
}
log.debug(f"cleared {_initial_len - len(STOCK_CACHE)}")
await asyncio.sleep(period)
@app.on_event("startup")
async def app_startup():
await stock_client.__aenter__()
clear_cache_task = asyncio.create_task(clear_expired_cache())
@app.on_event("shutdown")
async def app_shutdown():
await stock_client.__aexit__()
ScrapFly
Web scraper logic can quickly outgrow our API. If we're scraping difficult targets that ban scrapers and require complex connections and proxy usage soon our data API will do more scraping than API'ing. We can abstract all of these complexities away to ScrapFly!
For example, we could replace our scraper code to use ScrapFly through python sdk:
...
from scrapfly import ScrapflyClient, ScrapeConfig
stock_client = ScrapflyClient(key="YOUR SCRAPFLY KEY")
async def scrape_yahoo_finance(symbol):
"""scrapes stock data from yahoo finance"""
cache = STOCK_CACHE.get(symbol)
if cache and time() - CACHE_TIME < cache["_scraped_on"]:
log.debug(f"{symbol}: returning cached item")
return cache
log.info(f"{symbol}: scraping data")
result = await stock_client.async_scrape(ScrapeConfig(
url=f"https://finance.yahoo.com/quote/{symbol}?p={symbol}",
# we can select proxy country
country="US",
# or proxy type
proxy_pool="public_residential_pool",
# enable anti scraping protection bypass
asp=True,
# or real headless browser javascript rendering
render_js=True,
# we can enable caching
cache=True,
))
parsed = {}
for row in result.selector.xpath(
'//div[re:test(@data-test,"(left|right)-summary-table")]//td[@data-test]'
):
label = row.xpath("@data-test").get().split("-value")[0].lower()
value = " ".join(row.xpath(".//text()").getall())
parsed[label] = value
parsed["price"] = result.selector.css(
f'fin-streamer[data-field="regularMarketPrice"][data-symbol="{symbol}"]::attr(value)'
).get()
parsed["_scraped_on"] = time()
STOCK_CACHE[symbol] = parsed
return parsed
...
Now our data API can scrape content in real-time without being blocked or throttled.
ScrapFly also offers built-in cache and webhook functionalities - just like we've covered in this tutorial which makes it an easy integration to real time scraping APIs.
Summary
In this practical tutorial, we've taken a quick dive into data APIs that scrape data in real time. We've covered how to set up FastAPI, wrote up a simple Yahoo Finance stock data scraper then wrapped everything together into a single, cohesive data API with caching and webhook support.
We've done that in just a few lines of actual code by taking advantage of existing, powerful web API ecosystem in Python!