When it comes to e-commerce businesses, there are hundreds of different providers with various prices, ratings and competitive advantages. All of this public data can be compared and tracked using web scraping for a competitive overview.
In this web scraping guide, we'll explain how to create a tool for tracking competitor prices using Python. We'll be scraping specific products from several different providers then compare their prices and generate insights. Let's get started!
Why Track Competitor Prices?
The e-commerce business dynamics are constantly moving, leading the prices to change accordingly. Therefore, tracking competitor prices can aid in pricing evaluation and staying up-to-date with the market trends.
Monitoring competitor prices using web scraping also provides insights into the effect of products' pricing strategies. These insights can help businesses refine their own market positioning to remain competitive and attract new customers.
Moreover, tracking competitor prices allows buyers to identify potential investment opportunities with underpriced products. This can be achieved by comparing the same product prices across different retail providers.
For further details on using web scraping for tracking competitor prices, refer to our extensive introduction on web scraping use cases.
Project Setup
We'll build our competitor price tracking tool using Python and a few community libraries:
httpx: For sending HTTP requests to the product pages and retrieving the data as HTML.
parsel: For parsing the HTML and extracting data using XPath and CSS selectors.
loguru: For monitoring and logging our competitor price tracker.
To make the comparison natural, we'll scrape the same product from each website. In this guide example, we'll track PlayStation 5 prices. However, the technical concept can be applied to different products and websites.
To scrape the product data, we can extract the prices by scraping them from their respective pages. However, we'll use the search pages as they contain all the data we want.
We'll submit a search query with the "PS5 digital edition" keyword and extract the first product of the search, which represents the exact product we are looking for. Let's start by scraping the product data from Walmart:
Python
ScrapFly
import urllib.parse
import asyncio
import json
from httpx import AsyncClient, Response
from parsel import Selector
from typing import Dict, List
from loguru import logger as log
# create HTTP client with headers that look like a real web browser
client = AsyncClient(
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.9,lt;q=0.8,et;q=0.7,de;q=0.6",
},
follow_redirects=True,
http2=True
)
async def scrape_walmart(search_query: str) -> List[Dict]:
"""scrape Walmart search pages"""
def parse_walmart(response: Response) -> List[Dict]:
"""parse Walmart search pages"""
selector = Selector(response.text)
data = []
product_box = selector.xpath("//div[@data-testid='item-stack']/div[1]")
link = product_box.xpath(".//a[@link-identifier]/@link-identifier").get()
title = product_box.xpath(".//a[@link-identifier]/span/text()").get()
price = product_box.xpath(".//div[@data-automation-id='product-price']/span/text()").get()
price = float(price[price.find("$")+1: -1]) if price else None
rate = product_box.xpath(".//span[@data-testid='product-ratings']/@data-value").get()
review_count = product_box.xpath(".//span[@data-testid='product-reviews']/@data-value").get()
data.append({
"link": "https://www.walmart.com/ip/" + link,
"title": title,
"price": price,
"rate": float(rate) if rate else None,
"review_count": int(review_count) if review_count else None
})
return data
search_url = "https://www.walmart.com/search?q=" + urllib.parse.quote_plus(search_query) + "&sort=best_seller"
response = await client.get(search_url)
if response.status_code == 403:
raise Exception("Walmart requests are blocked")
data = parse_walmart(response)
log.success(f"scraped {len(data)} products from Walmart")
return data
import urllib.parse
import asyncio
import json
from typing import List, Dict
from scrapfly import ScrapeConfig, ScrapflyClient, ScrapeApiResponse
from loguru import logger as log
scrapfly = ScrapflyClient(key="Your ScrapFly API key")
async def scrape_walmart(search_query: str) -> List[Dict]:
"""scrape Walmart products"""
def parse_walmart(response: ScrapeApiResponse) -> List[Dict]:
"""parse Walmart product pages"""
selector = response.selector
data = []
product_box = selector.xpath("//div[@data-testid='item-stack']/div")
link = product_box.xpath(".//a[@link-identifier]/@link-identifier").get()
title = product_box.xpath(".//a[@link-identifier]/span/text()").get()
price = product_box.xpath(".//div[@data-automation-id='product-price']/span/text()").get()
price = float(price[price.find("$")+1: -1]) if price else None
rate = product_box.xpath(".//span[@data-testid='product-ratings']/@data-value").get()
review_count = product_box.xpath(".//span[@data-testid='product-reviews']/@data-value").get()
data.append({
"link": "https://www.walmart.com/ip/" + link,
"title": title,
"price": price,
"rate": float(rate) if rate else None,
"review_count": int(review_count) if review_count else None
})
return data
search_url = "https://www.walmart.com/search?q=" + urllib.parse.quote_plus(search_query) + "&sort=best_seller"
response = await scrapfly.async_scrape(ScrapeConfig(search_url, asp=True, country="US"))
data = parse_walmart(response)
log.success(f"scraped {len(data)} products from Walmart")
return data
Run the code
async def run():
data = await scrape_walmart(
search_query="PS5 digital edition"
)
# print the data in JSON format
print(json.dumps(data, indent=2))
if __name__=="__main__":
asyncio.run(run())
🤖 The websites we are scraping are known for their high block rate and it's likely to get blocked while requesting them. Run the ScrapFly code tabs to avoid the blocking.
Here, we define two functions, scrape_walmart and parse_walmart. Let's break them down:
scrape_walmart() for requesting the Walmart category page and retrieving the HTML.
parse_walmart() for parsing the HTML we got and extracting the link, title, price, rate and review count from each product.
Here is what the product result we scraped looks like:
[
{
"link": "https://www.walmart.com/ip/5113183757",
"title": "Sony PlayStation 5 (PS5) Digital Console Slim",
"price": 449.0,
"rate": 4.6,
"review_count": 369
}
]
We have successfully scraped products from one of our target websites. Let's apply the same approach to our other targets - Amazon and BestBuy:
Python
ScrapFly
import urllib.parse
import asyncio
import json
from httpx import AsyncClient, Response
from parsel import Selector
from typing import Dict, List
from loguru import logger as log
# create HTTP client with headers that look like a real web browser
client = AsyncClient(
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.9,lt;q=0.8,et;q=0.7,de;q=0.6",
},
follow_redirects=True,
http2=True
)
async def scrape_amazon(search_query: str) -> List[Dict]:
"""scrape Amazon search pages"""
def parse_amazon(response: Response) -> List[Dict]:
"""parse Amazon search pages"""
selector = Selector(response.text)
data = []
product_box = selector.xpath("//div[contains(@class, 'search-results')]/div[@data-component-type='s-search-result']")
product_id = product_box.xpath(".//div[@data-cy='title-recipe']/h2/a[contains(@class, 'a-link-normal')]/@href").get().split("/dp/")[-1].split("/")[0]
title = product_box.xpath(".//div[@data-cy='title-recipe']/h2/a/span/text()").get()
price = product_box.xpath(".//span[@class='a-price']/span/text()").get()
price = float(price.replace("$", "")) if price else None
rate = product_box.xpath(".//span[contains(@aria-label, 'stars')]/@aria-label").re_first(r"(\d+\.*\d*) out")
review_count = product_box.xpath(".//div[contains(@data-csa-c-content-id, 'ratings-count')]/span/@aria-label").get()
data.append({
"link": f"https://www.amazon.com/dp/{product_id}",
"title": title,
"price": price,
"rate": float(rate) if rate else None,
"review_count": int(review_count.replace(',','')) if review_count else None,
})
return data
search_url = "https://www.amazon.com/s?k=" + urllib.parse.quote_plus(search_query)
response = await client.get(search_url)
if response.status_code == 403 or 503:
raise Exception("Amazon requests are blocked")
data = parse_amazon(response)
log.success(f"scraped {len(data)} products from Amazon")
return data
async def scrape_bestbuy(search_query: str) -> List[Dict]:
"""scrape BestBuy search pages"""
def parse_bestbuy(response: Response) -> List[Dict]:
"""parse BestBuy search pages"""
selector = Selector(response.text)
data = []
product_box = selector.xpath("//ol[contains(@class, 'sku-item-list')]/li[@class='sku-item']")
product_id = product_box.xpath(".//h4[@class='sku-title']/a/@href").get().split("?skuId=")[-1]
title = product_box.xpath(".//h4[@class='sku-title']/a/text()").get()
price = product_box.xpath(".//div[contains(@class, 'priceView')]/span/text()").get()
price = float(price.replace("$", "")) if price else None
rate = product_box.xpath(".//div[contains(@class, 'ratings-reviews')]/p/text()").get()
review_count = product_box.xpath(".//span[@class='c-reviews ']/text()").get()
data.append({
"link": f"https://www.bestbuy.com/site/{product_id}.p",
"title": title,
"price": price,
"rate": float(rate.split()[1]) if rate else None,
"review_count": int(review_count[1:-1].replace(",", "")) if review_count else None
})
return data
search_url = "https://www.bestbuy.com/site/searchpage.jsp?st=" + urllib.parse.quote_plus(search_query)
response = await client.get(search_url)
if response.status_code == 403:
raise Exception("BestBuy requests are blocked")
data = parse_bestbuy(response)
log.success(f"scraped {len(data)} products from BestBuy")
return data
async def run():
amazon_data = await scrape_amazon(
search_query="PS5 digital edition"
)
bestbuy_data = await scrape_bestbuy(
search_query="PS5 digital edition"
)
# print the results in JSON format
print(json.dumps(amazon_data, indent=2, ensure_ascii=False))
print(json.dumps(bestbuy_data, indent=2, ensure_ascii=False))
if __name__=="__main__":
asyncio.run(run())
In the above code, we replicate our previous Walmart scraping logic with Amazon and BestBuy. We request the search pages and parse the first product of each search page for the same fields we extracted earlier. The output is similar to the results we got earlier:
We can successfully scrape the same product from different competitors and get pricing data along with other product details. Next, let's combine all our scraping logic to use it with the next stage of the tracking competitor pricing tool:
async def track_competitor_prices(
search_query: str
):
"""scrape products from different competitors"""
data = {}
data["walmart"] = await scrape_walmart(
search_query=search_query
)
data["amazon"] = await scrape_amazon(
search_query=search_query
)
data["bestbuy"] = await scrape_bestbuy(
search_query=search_query
)
product_count = sum(len(products) for products in data.values())
log.success(f"successfully scraped {product_count} products")
# save the results into a JSON file
with open("data.json", "w", encoding="utf-8") as file:
json.dump(data, file, indent=2, ensure_ascii=False)
async def run():
await track_competitor_prices(
search_query="PS5 digital edition"
)
if __name__=="__main__":
asyncio.run(run())
🙋 If you can't follow along with the code snippets, look for the full code section.
The output file contains all the results we got earlier, organized in one JSON file:
Our price scraper extracts only one product from the search page. However, it can be extended to iterate over all the products and paginate other search pages. For more details, refer to our previous guide on crawling the web.
Now that we have all the competitors' pricing data in one place. Let's compare it to get insights!
Comparing Competitor Prices
The web-scraped product data can be manually analyzed for insights into each competitor's performance. However, this can be exhausting with a higher volume of data. Therefore, we'll create a simple monitoring function for analyzing the data we retrieved.
This simple function analyzes the data we got from each target website and generates insight metrics:
def generate_insights(data):
"""analyze the data for insight values"""
def calculate_average(lst):
# Calculate the averages
non_none_values = [value for value in lst if value is not None]
return round(sum(non_none_values) / len(non_none_values), 2) if non_none_values else None
# Extract all products across competitors
all_products = [product for products in data.values() for product in products]
# Calculate overall averages
overall_average_price = calculate_average([product["price"] for product in all_products])
overall_average_rate = calculate_average([product["rate"] for product in all_products])
overall_average_review_count = calculate_average([product["review_count"] for product in all_products])
# Find the lowest priced, highest reviewed, highest priced, and highest rated products across all competitors
lowest_priced_product = min(all_products, key=lambda x: x["price"])
highest_reviewed_product = max(all_products, key=lambda x: x.get("review_count", 0) if x.get("review_count") is not None else 0)
highest_priced_product = max(all_products, key=lambda x: x["price"])
highest_rated_product = max(all_products, key=lambda x: x["rate"])
# Extract website names for each product
website_names = {retailer: products[0]["link"].split(".")[1] for retailer, products in data.items()}
insights = {
"Overall Average Price": overall_average_price,
"Overall Average Rate": overall_average_rate,
"Overall Average Review Count": overall_average_review_count,
"Lowest Priced Product": {
"Product": lowest_priced_product,
"Competitor": website_names.get(lowest_priced_product["link"].split(".")[1])
},
"Highest Priced Product": {
"Product": highest_priced_product,
"Competitor": website_names.get(highest_priced_product["link"].split(".")[1])
},
"Highest Rated Product": {
"Product": highest_rated_product,
"Competitor": website_names.get(highest_rated_product["link"].split(".")[1])
},
"Highest Reviewed Product": {
"Product": highest_reviewed_product,
"Competitor": website_names.get(highest_reviewed_product["link"].split(".")[1])
}
}
# Save the insights to a JSON file
with open("insights.json", "w") as json_file:
json.dump(insights, json_file, indent=2, ensure_ascii=False)
Here, we define a generate_insights function, which uses the calculate_average function to calculate the following metrics:
Average price, rate and review count for all products.
The above insight data is represented by numbers and statistics. However, these values can also be visualized for more precise insights. This can be achieved using Python libraries, such as Seaborn and Matplotlib, similar to what we did in our previous article on observing e-commerce trends.
We can successfully scrape and compare product prices. Next, let's schedule our competitor price monitoring tool to keep the data up-to-date!
Full Competitor Price Tracking Code
Here is what the final code of our project looks like:
import urllib.parse
import asyncio
import json
from typing import List, Dict
from scrapfly import ScrapeConfig, ScrapflyClient, ScrapeApiResponse
from loguru import logger as log
scrapfly = ScrapflyClient(key="Your ScrapFly API key")
async def scrape_walmart(search_query: str) -> List[Dict]:
"""scrape Walmart products"""
def parse_walmart(response: ScrapeApiResponse) -> List[Dict]:
"""parse Walmart product pages"""
selector = response.selector
data = []
product_box = selector.xpath("//div[@data-testid='item-stack']/div")
link = product_box.xpath(".//a[@link-identifier]/@link-identifier").get()
title = product_box.xpath(".//a[@link-identifier]/span/text()").get()
price = product_box.xpath(".//div[@data-automation-id='product-price']/span/text()").get()
price = float(price[price.find("$")+1: -1]) if price else None
rate = product_box.xpath(".//span[@data-testid='product-ratings']/@data-value").get()
review_count = product_box.xpath(".//span[@data-testid='product-reviews']/@data-value").get()
data.append({
"link": "https://www.walmart.com/ip/" + link,
"title": title,
"price": price,
"rate": float(rate) if rate else None,
"review_count": int(review_count) if review_count else None
})
return data
search_url = "https://www.walmart.com/search?q=" + urllib.parse.quote_plus(search_query) + "&sort=best_seller"
response = await scrapfly.async_scrape(ScrapeConfig(search_url, asp=True, country="US"))
data = parse_walmart(response)
log.success(f"scraped {len(data)} products from Walmart")
return data
async def scrape_amazon(search_query: str) -> List[Dict]:
"""scrape Amazon search pages"""
def parse_amazon(response: ScrapeApiResponse) -> List[Dict]:
"""parse Amazon search pages"""
selector = response.selector
data = []
product_box = selector.xpath("//div[contains(@class, 'search-results')]/div[@data-component-type='s-search-result']")
product_id = product_box.xpath(".//div[@data-cy='title-recipe']/h2/a[contains(@class, 'a-link-normal')]/@href").get().split("/dp/")[-1].split("/")[0]
title = product_box.xpath(".//div[@data-cy='title-recipe']/h2/a/span/text()").get()
price = product_box.xpath(".//span[@class='a-price']/span/text()").get()
price = float(price.replace("$", "")) if price else None
rate = product_box.xpath(".//span[contains(@aria-label, 'stars')]/@aria-label").re_first(r"(\d+\.*\d*) out")
review_count = product_box.xpath(".//div[contains(@data-csa-c-content-id, 'ratings-count')]/span/@aria-label").get()
data.append({
"link": f"https://www.amazon.com/dp/{product_id}",
"title": title,
"price": price,
"rate": float(rate) if rate else None,
"review_count": int(review_count.replace(',','')) if review_count else None,
})
return data
search_url = "https://www.amazon.com/s?k=" + urllib.parse.quote_plus(search_query)
response = await scrapfly.async_scrape(ScrapeConfig(
search_url, asp=True, country="US", proxy_pool="public_residential_pool",
render_js=True, retry=True))
data = parse_amazon(response)
log.success(f"scraped {len(data)} products from Amazon")
return data
async def scrape_bestbuy(search_query: str) -> List[Dict]:
"""scrape BestBuy search pages"""
def parse_bestbuy(response: ScrapeApiResponse) -> List[Dict]:
"""parse BestBuy search pages"""
selector = response.selector
data = []
product_box = selector.xpath("//ol[contains(@class, 'sku-item-list')]/li[@class='sku-item']")
product_id = product_box.xpath(".//h4[@class='sku-title']/a/@href").get().split("?skuId=")[-1]
title = product_box.xpath(".//h4[@class='sku-title']/a/text()").get()
price = product_box.xpath(".//div[contains(@class, 'priceView')]/span/text()").get()
price = float(price.replace("$", "")) if price else None
rate = product_box.xpath(".//div[contains(@class, 'ratings-reviews')]/p/text()").get()
review_count = product_box.xpath(".//span[@class='c-reviews ']/text()").get()
data.append({
"link": f"https://www.bestbuy.com/site/{product_id}.p",
"title": title,
"price": price,
"rate": float(rate.split()[1]) if rate else None,
"review_count": int(review_count[1:-1].replace(",", "")) if review_count else None
})
return data
search_url = "https://www.bestbuy.com/site/searchpage.jsp?st=" + urllib.parse.quote_plus(search_query)
response = await scrapfly.async_scrape(ScrapeConfig(search_url, asp=True, country="US"))
data = parse_bestbuy(response)
log.success(f"scraped {len(data)} products from BestBuy")
return data
def generate_insights(data: Dict):
"""analyze the data for insight values"""
def calculate_average(lst):
# Calculate the averages
non_none_values = [value for value in lst if value is not None]
return round(sum(non_none_values) / len(non_none_values), 2) if non_none_values else None
# calculate average prices, rates, and review counts for each competitor
average_prices = {
retailer: calculate_average([product["price"] for product in products])
for retailer, products in data.items()
}
average_rates = {
retailer: calculate_average([product["rate"] for product in products])
for retailer, products in data.items()
}
average_review_counts = {
retailer: calculate_average([product["review_count"] for product in products])
for retailer, products in data.items()
}
# calculate the lowest priced product and the product with the highest number of reviews for each retailer
lowest_priced_products = {
retailer: min(products, key=lambda x: x["price"])
for retailer, products in data.items()
}
highest_reviewed_products = {
retailer: max(products, key=lambda x: x.get("review_count", 0) if x.get("review_count") is not None else 0)
for retailer, products in data.items()
}
insights = {
"Average prices": average_prices,
"Average rates": average_rates,
"Average review counts": average_review_counts,
"Lowest priced products": lowest_priced_products,
"Highest reviewed products": highest_reviewed_products,
}
# save the insights to a JSON file
with open("insights.json", "w") as json_file:
json.dump(insights, json_file, indent=2, ensure_ascii=False)
async def track_competitor_prices(
search_query: str
):
"""scrape products from different competitors"""
data = {}
data["walmart"] = await scrape_walmart(
search_query=search_query
)
data["amazon"] = await scrape_amazon(
search_query=search_query
)
data["bestbuy"] = await scrape_bestbuy(
search_query=search_query
)
product_count = sum(len(products) for products in data.values())
log.success(f"successfully scraped {product_count} products")
# save the results into a JSON file
# create the insights file
generate_insights(data)
with open("data.json", "w", encoding="utf-8") as file:
json.dump(data, file, indent=2, ensure_ascii=False)
# main competitor price tracking function function
async def run():
log.info("----- Scheduler has started -----")
await track_competitor_prices(
search_query="PS5 digital edition"
)
log.success("----- Scheduler has finished -----")
async def main():
while True:
# run the script every 3 hours
await run()
await asyncio.sleep(3 * 3600)
if __name__=="__main__":
asyncio.run(main())
Bypass Scraping Blocking With ScrapFly
Our code about web scraping for tracking competitor prices relies on requesting popular websites with a high protection level. Moreover, web scraping prices often require requesting a high volume of web pages. So, attempting to scale our scrapers leads to guaranteed blocking:
from httpx import Client
# create HTTP client with headers that look like a real web browser
client = Client(
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.9,lt;q=0.8,et;q=0.7,de;q=0.6",
},
follow_redirects=True,
http2=True
)
url = "https://www.amazon.com/s?rh=n%3A20972796011"
response = client.get(url)
print(response)
"<Response [503 Service Unavailable]>" # Amazon detected the request
Let's use ScrapFly to bypass the blocked request sent to Amazon. All we have to do is replace httpx with the ScrapFly client, enable anti-scraping protection using the asp parameter and select and a proxy country:
# standard web scraping code
import httpx
response = httpx.get("https://www.amazon.com/s?rh=n%3A20972796011")
# in ScrapFly, it becomes this 👇
from scrapfly import ScrapeConfig, ScrapflyClient, ScrapeApiResponse
scrapfly = ScrapflyClient(key="Your ScrapFly API key")
api_response: ScrapeApiResponse = scrapfly.scrape(
ScrapeConfig(
# target website URL
url="https://www.amazon.com/s?rh=n%3A20972796011",
# Bypass anti-scraping protection
asp=True,
# select a proxy pool (residential or datacenter)
proxy_pool="public_residential_pool",
# Set the proxy location to a specific country
country="US",
# enable JavaScript rendering if needed, similar to headless browsers
render_js=True,
)
)
# Print the website's status code
print(api_response.upstream_status_code)
"200"
# get the HTML from the response
html = api_response.scrape_result['content']
# use the built-in Prasel selector
selector = api_response.selector
To wrap up this guide, let's have a look at some frequently asked questions about tracking competitor prices.
Is it possible to track historical product prices on e-commerce websites?
Yes. Using web scraping, you can scrape specific product pages for price data, schedule the scraper to request it every certain amount of time and then track prices by comparing the price values. We have covered tracking historical price data in a previous guide.
Can I monitor competitor prices in different languages and currencies?
Yes, you can change the web scraping language and currency by changing the requests' configuration, such as proxy location, headers and cookies. For more details, refer to our previous guide on web scraping localization.
Why does my scraper see a different price than the one I see in my browser?
Prices can be dynamically configured for specific regions and browser fingerprints. For the most accurate results ensure the scraper is scraping with the same configuration (IP address location, user agent, etc.) as the browser.
Summary
In this article, went through a step-by-step guide on tracking competitor prices using Python. We started by scraping product prices from Walmart, Amazon and BestBuy using httpx and Parsel. Then, we used Python to get insights into product pricing data and compare different competitors' performance. Finally, we scheduled our monitoring competitor prices tool using asyncio to keep the data up-to-date.
Learn how to scrape BestBuy, one of the most popular retail stores for electronic stores in the United States. We'll scrape different data types from product, search, review, and sitemap pages using different web scraping techniques.
In this scrapeguide we're taking a look at Etsy.com - a popular e-commerce market for hand crafted and vintage items. We'll be using Python and HTML parsing to scrape search and product data.