When it comes to e-commerce businesses, there are hundreds of different providers with various prices, ratings and competitive advantages. All of this public data can be compared and tracked using web scraping for a competitive overview.
In this web scraping guide, we'll explain how to create a tool for tracking competitor prices using Python. We'll be scraping specific products from several different providers then compare their prices and generate insights. Let's get started!
Key Takeaways
Master scraping prices from websites with advanced Python techniques, price monitoring automation, and market analysis for comprehensive e-commerce intelligence.
- Implement automated price tracking systems using Python with httpx and parsel for multi-vendor price comparison
- Configure data collection pipelines for real-time price monitoring across different e-commerce platforms
- Implement price change detection algorithms and alert systems for competitive advantage
- Configure proxy rotation and fingerprint management to avoid detection and rate limiting
- Use specialized tools like ScrapFly for automated price tracking with anti-blocking features
- Implement data visualization and analytics for comprehensive market trend analysis
Why Track Competitor Prices?
The e-commerce business dynamics are constantly moving, leading the prices to change accordingly. Therefore, tracking competitor prices can aid in pricing evaluation and staying up-to-date with the market trends.
Monitoring competitor prices using web scraping also provides insights into the effect of products' pricing strategies. These insights can help businesses refine their own market positioning to remain competitive and attract new customers.
Moreover, tracking competitor prices allows buyers to identify potential investment opportunities with underpriced products. This can be achieved by comparing the same product prices across different retail providers.
For further details on using web scraping for tracking competitor prices, refer to our extensive introduction on web scraping use cases.
Project Setup
We'll build our competitor price tracking tool using Python and a few community libraries:
- httpx: For sending HTTP requests to the product pages and retrieving the data as HTML.
- parsel: For parsing the HTML and extracting data using XPath and CSS selectors.
- loguru: For monitoring and logging our competitor price tracker.
- asyncio: For running our scrapers asynchronously, increasing our web scraping speed.
Since asyncio comes pre-installed in Python, you will only have to install the other libraries using the following pip command:
pip install httpx parsel loguru
How to Build a Competitor Price Tracking Tool?
In a previous guide, we created a similar price-tracking tool for observing e-commerce trends by visualizing product insights.
How to Observe E-Commerce Trends using Web Scraping
In this example web scraping project we'll be taking a look at monitoring E-Commerce trends using Python, web scraping and data visualization tools.
Our tool in this guide will be focused on monitoring competitor prices, which will be divided into three parts:
- Product scraper, for scraping product prices from different competitor websites.
- Price comparator, for comparing the products' prices and generating insights.
- Web scraping scheduling, for automating the previous steps using a cron job.
Let's start with the product scraping!
Web Scraping Prices
In this guide, we'll scrape product price data from three different competitors:
To make the comparison natural, we'll scrape the same product from each website. In this guide example, we'll track PlayStation 5 prices. However, the technical concept can be applied to different products and websites.
To scrape the product data, we can extract the prices by scraping them from their respective pages. However, we'll use the search pages as they contain all the data we want.
We'll submit a search query with the "PS5 digital edition" keyword and extract the first product of the search, which represents the exact product we are looking for. Let's start by scraping the product data from Walmart:
import urllib.parse
import asyncio
import json
from httpx import AsyncClient, Response
from parsel import Selector
from typing import Dict, List
from loguru import logger as log
# create HTTP client with headers that look like a real web browser
client = AsyncClient(
    headers={
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        "Accept-Language": "en-US,en;q=0.9,lt;q=0.8,et;q=0.7,de;q=0.6",
    },
    follow_redirects=True,
    http2=True
)
async def scrape_walmart(search_query: str) -> List[Dict]:
    """scrape Walmart search pages"""
    def parse_walmart(response: Response) -> List[Dict]:
        """parse Walmart search pages"""
        selector = Selector(response.text)
        data = []
        product_box = selector.xpath("//div[@data-testid='item-stack']/div[1]")
        link = product_box.xpath(".//a[@link-identifier]/@link-identifier").get()
        title = product_box.xpath(".//a[@link-identifier]/span/text()").get()
        price = product_box.xpath(".//div[@data-automation-id='product-price']/span/text()").get()
        price = float(price[price.find("$")+1: -1]) if price else None
        rate = product_box.xpath(".//span[@data-testid='product-ratings']/@data-value").get()
        review_count = product_box.xpath(".//span[@data-testid='product-reviews']/@data-value").get()
        data.append({
                "link": "https://www.walmart.com/ip/" + link,
                "title": title,
                "price": price,
                "rate": float(rate) if rate else None,
                "review_count": int(review_count) if review_count else None
            })
        return data
    
    search_url = "https://www.walmart.com/search?q=" + urllib.parse.quote_plus(search_query) + "&sort=best_seller"
    response = await client.get(search_url)
    if response.status_code == 403:
        raise Exception("Walmart requests are blocked")       
    data = parse_walmart(response)
    log.success(f"scraped {len(data)} products from Walmart")
    return data
import urllib.parse
import asyncio
import json
from typing import List, Dict
from scrapfly import ScrapeConfig, ScrapflyClient, ScrapeApiResponse
from loguru import logger as log
scrapfly = ScrapflyClient(key="Your ScrapFly API key")
async def scrape_walmart(search_query: str) -> List[Dict]:
    """scrape Walmart products"""
    def parse_walmart(response: ScrapeApiResponse) -> List[Dict]:
        """parse Walmart product pages"""
        selector = response.selector
        data = []
        product_box = selector.xpath("//div[@data-testid='item-stack']/div")
        link = product_box.xpath(".//a[@link-identifier]/@link-identifier").get()
        title = product_box.xpath(".//a[@link-identifier]/span/text()").get()
        price = product_box.xpath(".//div[@data-automation-id='product-price']/span/text()").get()
        price = float(price[price.find("$")+1: -1]) if price else None
        rate = product_box.xpath(".//span[@data-testid='product-ratings']/@data-value").get()
        review_count = product_box.xpath(".//span[@data-testid='product-reviews']/@data-value").get()
        data.append({
                "link": "https://www.walmart.com/ip/" + link,
                "title": title,
                "price": price,
                "rate": float(rate) if rate else None,
                "review_count": int(review_count) if review_count else None
            })
        return data
    
    search_url = "https://www.walmart.com/search?q=" + urllib.parse.quote_plus(search_query) + "&sort=best_seller"
    response = await scrapfly.async_scrape(ScrapeConfig(search_url, asp=True, country="US"))
    data = parse_walmart(response)
    log.success(f"scraped {len(data)} products from Walmart")
    return data
Run the code
async def run():
    data = await scrape_walmart(
        search_query="PS5 digital edition"
    )
    # print the data in JSON format
    print(json.dumps(data, indent=2))
if __name__=="__main__":
    asyncio.run(run())
🤖 The websites we are scraping are known for their high block rate and it's likely to get blocked while requesting them. Run the ScrapFly code tabs to avoid the blocking.
Here, we define two functions, scrape_walmart and parse_walmart. Let's break them down:
- scrape_walmart()for requesting the Walmart category page and retrieving the HTML.
- parse_walmart()for parsing the HTML we got and extracting the link, title, price, rate and review count from each product.
Here is what the product result we scraped looks like:
[
  {
    "link": "https://www.walmart.com/ip/5113183757",
    "title": "Sony PlayStation 5 (PS5) Digital Console Slim",
    "price": 449.0,
    "rate": 4.6,
    "review_count": 369
  }
]
We have successfully scraped products from one of our target websites. Let's apply the same approach to our other targets - Amazon and BestBuy:
import urllib.parse
import asyncio
import json
from httpx import AsyncClient, Response
from parsel import Selector
from typing import Dict, List
from loguru import logger as log
# create HTTP client with headers that look like a real web browser
client = AsyncClient(
    headers={
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        "Accept-Language": "en-US,en;q=0.9,lt;q=0.8,et;q=0.7,de;q=0.6",
    },
    follow_redirects=True,
    http2=True
)
async def scrape_amazon(search_query: str) -> List[Dict]:
    """scrape Amazon search pages"""
    def parse_amazon(response: Response) -> List[Dict]:
        """parse Amazon search pages"""
        selector = Selector(response.text)
        data = []
        product_box = selector.xpath("//div[contains(@class, 'search-results')]/div[@data-component-type='s-search-result']")
        product_id = product_box.xpath(".//div[@data-cy='title-recipe']/h2/a[contains(@class, 'a-link-normal')]/@href").get().split("/dp/")[-1].split("/")[0]
        title = product_box.xpath(".//div[@data-cy='title-recipe']/h2/a/span/text()").get()
        price = product_box.xpath(".//span[@class='a-price']/span/text()").get()
        price = float(price.replace("$", "")) if price else None
        rate = product_box.xpath(".//span[contains(@aria-label, 'stars')]/@aria-label").re_first(r"(\d+\.*\d*) out")
        review_count = product_box.xpath(".//div[contains(@data-csa-c-content-id, 'ratings-count')]/span/@aria-label").get()
        data.append({
                "link": f"https://www.amazon.com/dp/{product_id}",
                "title": title,
                "price": price,
                "rate": float(rate) if rate else None,
                "review_count": int(review_count.replace(',','')) if review_count else None,
            })
        return data
    
    search_url = "https://www.amazon.com/s?k=" + urllib.parse.quote_plus(search_query)
    response = await client.get(search_url)
    if response.status_code == 403 or 503:
        raise Exception("Amazon requests are blocked")   
    data = parse_amazon(response)
    log.success(f"scraped {len(data)} products from Amazon")
    return data
async def scrape_bestbuy(search_query: str) -> List[Dict]:
    """scrape BestBuy search pages"""
    def parse_bestbuy(response: Response) -> List[Dict]:
        """parse BestBuy search pages"""
        selector = Selector(response.text)
        data = []
        product_box = selector.xpath("//ol[contains(@class, 'sku-item-list')]/li[@class='sku-item']")
        product_id = product_box.xpath(".//h4[@class='sku-title']/a/@href").get().split("?skuId=")[-1]
        title = product_box.xpath(".//h4[@class='sku-title']/a/text()").get()
        price = product_box.xpath(".//div[contains(@class, 'priceView')]/span/text()").get()
        price = float(price.replace("$", "")) if price else None
        rate = product_box.xpath(".//div[contains(@class, 'ratings-reviews')]/p/text()").get()
        review_count = product_box.xpath(".//span[@class='c-reviews ']/text()").get()
        data.append({
                "link": f"https://www.bestbuy.com/site/{product_id}.p",
                "title": title,
                "price": price,
                "rate": float(rate.split()[1]) if rate else None,
                "review_count": int(review_count[1:-1].replace(",", "")) if review_count else None
            })
        return data
    
    search_url = "https://www.bestbuy.com/site/searchpage.jsp?st=" + urllib.parse.quote_plus(search_query)
    response = await client.get(search_url)
    if response.status_code == 403:
        raise Exception("BestBuy requests are blocked")   
    data = parse_bestbuy(response)
    log.success(f"scraped {len(data)} products from BestBuy")
    return data
import urllib.parse
import asyncio
import json
from typing import List, Dict
from scrapfly import ScrapeConfig, ScrapflyClient, ScrapeApiResponse
from loguru import logger as log
scrapfly = ScrapflyClient(key="Your ScrapFly API key")
async def scrape_amazon(search_query: str) -> List[Dict]:
    """scrape Amazon search pages"""
    def parse_amazon(response: ScrapeApiResponse) -> List[Dict]:
        """parse Amazon search pages"""
        selector = response.selector
        data = []
        product_box = selector.xpath("//div[contains(@class, 'search-results')]/div[@data-component-type='s-search-result']")
        product_id = product_box.xpath(".//div[@data-cy='title-recipe']/h2/a[contains(@class, 'a-link-normal')]/@href").get().split("/dp/")[-1].split("/")[0]
        title = product_box.xpath(".//div[@data-cy='title-recipe']/h2/a/span/text()").get()
        price = product_box.xpath(".//span[@class='a-price']/span/text()").get()
        price = float(price.replace("$", "")) if price else None
        rate = product_box.xpath(".//span[contains(@aria-label, 'stars')]/@aria-label").re_first(r"(\d+\.*\d*) out")
        review_count = product_box.xpath(".//div[contains(@data-csa-c-content-id, 'ratings-count')]/span/@aria-label").get()
        data.append({
                "link": f"https://www.amazon.com/dp/{product_id}",
                "title": title,
                "price": price,
                "rate": float(rate) if rate else None,
                "review_count": int(review_count.replace(',','')) if review_count else None,
            })
        return data
    
    search_url = "https://www.amazon.com/s?k=" + urllib.parse.quote_plus(search_query)
    response = await scrapfly.async_scrape(ScrapeConfig(
        search_url, asp=True, country="US", proxy_pool="public_residential_pool",
        render_js=True, retry=True))
    data = parse_amazon(response)
    log.success(f"scraped {len(data)} products from Amazon")
    return data
async def scrape_bestbuy(search_query: str) -> List[Dict]:
    """scrape BestBuy search pages"""
    def parse_bestbuy(response: ScrapeApiResponse) -> List[Dict]:
        """parse BestBuy search pages"""
        selector = response.selector
        data = []
        product_box = selector.xpath("//ol[contains(@class, 'sku-item-list')]/li[@class='sku-item']")
        product_id = product_box.xpath(".//h4[@class='sku-title']/a/@href").get().split("?skuId=")[-1]
        title = product_box.xpath(".//h4[@class='sku-title']/a/text()").get()
        price = product_box.xpath(".//div[contains(@class, 'priceView')]/span/text()").get()
        price = float(price.replace("$", "")) if price else None
        rate = product_box.xpath(".//div[contains(@class, 'ratings-reviews')]/p/text()").get()
        review_count = product_box.xpath(".//span[@class='c-reviews ']/text()").get()
        data.append({
                "link": f"https://www.bestbuy.com/site/{product_id}.p",
                "title": title,
                "price": price,
                "rate": float(rate.split()[1]) if rate else None,
                "review_count": int(review_count[1:-1].replace(",", "")) if review_count else None
            })
        return data
    
    search_url = "https://www.bestbuy.com/site/searchpage.jsp?st=" + urllib.parse.quote_plus(search_query)
    response = await scrapfly.async_scrape(ScrapeConfig(search_url, asp=True, country="US"))
    data = parse_bestbuy(response)
    log.success(f"scraped {len(data)} products from BestBuy")
    return data
Run the code
async def run():
    amazon_data = await scrape_amazon(
        search_query="PS5 digital edition"
    )
    bestbuy_data = await scrape_bestbuy(
        search_query="PS5 digital edition"
    )
    # print the results in JSON format
    print(json.dumps(amazon_data, indent=2, ensure_ascii=False))
    print(json.dumps(bestbuy_data, indent=2, ensure_ascii=False))
if __name__=="__main__":
    asyncio.run(run())
In the above code, we replicate our previous Walmart scraping logic with Amazon and BestBuy. We request the search pages and parse the first product of each search page for the same fields we extracted earlier. The output is similar to the results we got earlier:
"Amazon"
[
  {
    "link": "https://www.bestbuy.com/site/6566040.p",
    "title": "Sony - PlayStation 5 Slim Console Digital Edition - White",
    "price": 449.99,
    "rate": 4.8,
    "review_count": 769
  }
]
We can successfully scrape the same product from different competitors and get pricing data along with other product details. Next, let's combine all our scraping logic to use it with the next stage of the tracking competitor pricing tool:
async def track_competitor_prices(
        search_query: str
    ):
    """scrape products from different competitors"""
    data = {}
    data["walmart"] = await scrape_walmart(
        search_query=search_query
    )
    data["amazon"] = await scrape_amazon(
        search_query=search_query
    )
    data["bestbuy"] = await scrape_bestbuy(
        search_query=search_query
    )
    product_count = sum(len(products) for products in data.values())
    log.success(f"successfully scraped {product_count} products")
    # save the results into a JSON file
    
    with open("data.json", "w", encoding="utf-8") as file:
        json.dump(data, file, indent=2, ensure_ascii=False)
async def run():
    await track_competitor_prices(
        search_query="PS5 digital edition"
    )
if __name__=="__main__":
    asyncio.run(run())
🙋 If you can't follow along with the code snippets, look for the full code section.
The output file contains all the results we got earlier, organized in one JSON file:
{
  "walmart": [
    {
      "link": "https://www.walmart.com/ip/5113183757",
      "title": "Sony PlayStation 5 (PS5) Digital Console Slim",
      "price": 449.0,
      "rate": 4.6,
      "review_count": 369
    }
  ],
  "amazon": [
    {
      "link": "https://www.amazon.com/dp/B0CL5KNB9M",
      "title": "PlayStation®5 Digital Edition (slim)",
      "price": 449.0,
      "rate": 4.7,
      "review_count": 2521
    }
  ],
  "bestbuy": [
    {
      "link": "https://www.bestbuy.com/site/6566040.p",
      "title": "Sony - PlayStation 5 Slim Console Digital Edition - White",
      "price": 449.99,
      "rate": 4.8,
      "review_count": 769
    }
  ]
}
Our price scraper extracts only one product from the search page. However, it can be extended to iterate over all the products and paginate other search pages. For more details, refer to our previous guide on crawling the web.
How to Crawl the Web with Python
Introduction to web crawling with Python. What is web crawling? How it differs from web scraping? And a deep dive into code, building our own crawler and an example project crawling Shopify-powered websites.
Now that we have all the competitors' pricing data in one place. Let's compare it to get insights!
Comparing Competitor Prices
The web-scraped product data can be manually analyzed for insights into each competitor's performance. However, this can be exhausting with a higher volume of data. Therefore, we'll create a simple monitoring function for analyzing the data we retrieved.
This simple function analyzes the data we got from each target website and generates insight metrics:
def generate_insights(data):
    """analyze the data for insight values"""
    def calculate_average(lst):
        # Calculate the averages
        non_none_values = [value for value in lst if value is not None]
        return round(sum(non_none_values) / len(non_none_values), 2) if non_none_values else None
    # Extract all products across competitors
    all_products = [product for products in data.values() for product in products]
    # Calculate overall averages
    overall_average_price = calculate_average([product["price"] for product in all_products])
    overall_average_rate = calculate_average([product["rate"] for product in all_products])
    overall_average_review_count = calculate_average([product["review_count"] for product in all_products])
    # Find the lowest priced, highest reviewed, highest priced, and highest rated products across all competitors
    lowest_priced_product = min(all_products, key=lambda x: x["price"])
    highest_reviewed_product = max(all_products, key=lambda x: x.get("review_count", 0) if x.get("review_count") is not None else 0)
    highest_priced_product = max(all_products, key=lambda x: x["price"])
    highest_rated_product = max(all_products, key=lambda x: x["rate"])
    # Extract website names for each product
    website_names = {retailer: products[0]["link"].split(".")[1] for retailer, products in data.items()}
    insights = {
        "Overall Average Price": overall_average_price,
        "Overall Average Rate": overall_average_rate,
        "Overall Average Review Count": overall_average_review_count,
        "Lowest Priced Product": {
            "Product": lowest_priced_product,
            "Competitor": website_names.get(lowest_priced_product["link"].split(".")[1])
        },
        "Highest Priced Product": {
            "Product": highest_priced_product,
            "Competitor": website_names.get(highest_priced_product["link"].split(".")[1])
        },
        "Highest Rated Product": {
            "Product": highest_rated_product,
            "Competitor": website_names.get(highest_rated_product["link"].split(".")[1])
        },                
        "Highest Reviewed Product": {
            "Product": highest_reviewed_product,
            "Competitor": website_names.get(highest_reviewed_product["link"].split(".")[1])
        }
    }
    # Save the insights to a JSON file
    with open("insights.json", "w") as json_file:
        json.dump(insights, json_file, indent=2, ensure_ascii=False)
Here, we define a generate_insights function, which uses the calculate_average function to calculate the following metrics:
- Average price, rate and review count for all products.
- Lowest and highest priced products.
- Highest product in rate and review count.
Here are the insights we got:
{
  "Overall Average Price": 449.33,
  "Overall Average Rate": 4.7,
  "Overall Average Review Count": 1219.67,
  "Lowest Priced Product": {
    "Product": {
      "link": "https://www.walmart.com/ip/5113183757",
      "title": "Sony PlayStation 5 (PS5) Digital Console Slim",
      "price": 449.0,
      "rate": 4.6,
      "review_count": 369
    },
    "Competitor": "walmart"
  },
  "Highest Priced Product": {
    "Product": {
      "link": "https://www.bestbuy.com/site/6566040.p",
      "title": "Sony - PlayStation 5 Slim Console Digital Edition - White",
      "price": 449.99,
      "rate": 4.8,
      "review_count": 769
    },
    "Competitor": "bestbuy"
  },
  "Highest Rated Product": {
    "Product": {
      "link": "https://www.bestbuy.com/site/6566040.p",
      "title": "Sony - PlayStation 5 Slim Console Digital Edition - White",
      "price": 449.99,
      "rate": 4.8,
      "review_count": 769
    },
    "Competitor": "bestbuy"
  },
  "Highest Reviewed Product": {
    "Product": {
      "link": "https://www.amazon.com/dp/B0CL5KNB9M",
      "title": "PlayStation�5 Digital Edition (slim)",
      "price": 449.0,
      "rate": 4.7,
      "review_count": 2521
    },
    "Competitor": "amazon"
  }
}
The above insight data is represented by numbers and statistics. However, these values can also be visualized for more precise insights. This can be achieved using Python libraries, such as Seaborn and Matplotlib, similar to what we did in our previous article on observing e-commerce trends.
We can successfully scrape and compare product prices. Next, let's schedule our competitor price monitoring tool to keep the data up-to-date!
Full Competitor Price Tracking Code
Here is what the final code of our project looks like:
import urllib.parse
import asyncio
import json
from typing import List, Dict
from scrapfly import ScrapeConfig, ScrapflyClient, ScrapeApiResponse
from loguru import logger as log
scrapfly = ScrapflyClient(key="Your ScrapFly API key")
async def scrape_walmart(search_query: str) -> List[Dict]:
    """scrape Walmart products"""
    def parse_walmart(response: ScrapeApiResponse) -> List[Dict]:
        """parse Walmart product pages"""
        selector = response.selector
        data = []
        product_box = selector.xpath("//div[@data-testid='item-stack']/div")
        link = product_box.xpath(".//a[@link-identifier]/@link-identifier").get()
        title = product_box.xpath(".//a[@link-identifier]/span/text()").get()
        price = product_box.xpath(".//div[@data-automation-id='product-price']/span/text()").get()
        price = float(price[price.find("$")+1: -1]) if price else None
        rate = product_box.xpath(".//span[@data-testid='product-ratings']/@data-value").get()
        review_count = product_box.xpath(".//span[@data-testid='product-reviews']/@data-value").get()
        data.append({
                "link": "https://www.walmart.com/ip/" + link,
                "title": title,
                "price": price,
                "rate": float(rate) if rate else None,
                "review_count": int(review_count) if review_count else None
            })
        return data
    
    search_url = "https://www.walmart.com/search?q=" + urllib.parse.quote_plus(search_query) + "&sort=best_seller"
    response = await scrapfly.async_scrape(ScrapeConfig(search_url, asp=True, country="US"))
    data = parse_walmart(response)
    log.success(f"scraped {len(data)} products from Walmart")
    return data
async def scrape_amazon(search_query: str) -> List[Dict]:
    """scrape Amazon search pages"""
    def parse_amazon(response: ScrapeApiResponse) -> List[Dict]:
        """parse Amazon search pages"""
        selector = response.selector
        data = []
        product_box = selector.xpath("//div[contains(@class, 'search-results')]/div[@data-component-type='s-search-result']")
        product_id = product_box.xpath(".//div[@data-cy='title-recipe']/h2/a[contains(@class, 'a-link-normal')]/@href").get().split("/dp/")[-1].split("/")[0]
        title = product_box.xpath(".//div[@data-cy='title-recipe']/h2/a/span/text()").get()
        price = product_box.xpath(".//span[@class='a-price']/span/text()").get()
        price = float(price.replace("$", "")) if price else None
        rate = product_box.xpath(".//span[contains(@aria-label, 'stars')]/@aria-label").re_first(r"(\d+\.*\d*) out")
        review_count = product_box.xpath(".//div[contains(@data-csa-c-content-id, 'ratings-count')]/span/@aria-label").get()
        data.append({
                "link": f"https://www.amazon.com/dp/{product_id}",
                "title": title,
                "price": price,
                "rate": float(rate) if rate else None,
                "review_count": int(review_count.replace(',','')) if review_count else None,
            })
        return data
    
    search_url = "https://www.amazon.com/s?k=" + urllib.parse.quote_plus(search_query)
    response = await scrapfly.async_scrape(ScrapeConfig(
        search_url, asp=True, country="US", proxy_pool="public_residential_pool",
        render_js=True, retry=True))
    data = parse_amazon(response)
    log.success(f"scraped {len(data)} products from Amazon")
    return data
async def scrape_bestbuy(search_query: str) -> List[Dict]:
    """scrape BestBuy search pages"""
    def parse_bestbuy(response: ScrapeApiResponse) -> List[Dict]:
        """parse BestBuy search pages"""
        selector = response.selector
        data = []
        product_box = selector.xpath("//ol[contains(@class, 'sku-item-list')]/li[@class='sku-item']")
        product_id = product_box.xpath(".//h4[@class='sku-title']/a/@href").get().split("?skuId=")[-1]
        title = product_box.xpath(".//h4[@class='sku-title']/a/text()").get()
        price = product_box.xpath(".//div[contains(@class, 'priceView')]/span/text()").get()
        price = float(price.replace("$", "")) if price else None
        rate = product_box.xpath(".//div[contains(@class, 'ratings-reviews')]/p/text()").get()
        review_count = product_box.xpath(".//span[@class='c-reviews ']/text()").get()
        data.append({
                "link": f"https://www.bestbuy.com/site/{product_id}.p",
                "title": title,
                "price": price,
                "rate": float(rate.split()[1]) if rate else None,
                "review_count": int(review_count[1:-1].replace(",", "")) if review_count else None
            })
        return data
    
    search_url = "https://www.bestbuy.com/site/searchpage.jsp?st=" + urllib.parse.quote_plus(search_query)
    response = await scrapfly.async_scrape(ScrapeConfig(search_url, asp=True, country="US"))
    data = parse_bestbuy(response)
    log.success(f"scraped {len(data)} products from BestBuy")
    return data
def generate_insights(data: Dict):
    """analyze the data for insight values"""
    def calculate_average(lst):
        # Calculate the averages
        non_none_values = [value for value in lst if value is not None]
        return round(sum(non_none_values) / len(non_none_values), 2) if non_none_values else None
    # calculate average prices, rates, and review counts for each competitor
    average_prices = {
        retailer: calculate_average([product["price"] for product in products])
        for retailer, products in data.items()
    }
    average_rates = {
        retailer: calculate_average([product["rate"] for product in products])
        for retailer, products in data.items()
    }
    average_review_counts = {
        retailer: calculate_average([product["review_count"] for product in products])
        for retailer, products in data.items()
    }
    # calculate the lowest priced product and the product with the highest number of reviews for each retailer
    lowest_priced_products = {
        retailer: min(products, key=lambda x: x["price"])
        for retailer, products in data.items()
    }
    highest_reviewed_products = {
        retailer: max(products, key=lambda x: x.get("review_count", 0) if x.get("review_count") is not None else 0)
        for retailer, products in data.items()
    }
    insights = {
        "Average prices": average_prices,
        "Average rates": average_rates,
        "Average review counts": average_review_counts,
        "Lowest priced products": lowest_priced_products,
        "Highest reviewed products": highest_reviewed_products,
    }
    # save the insights to a JSON file
    with open("insights.json", "w") as json_file:
        json.dump(insights, json_file, indent=2, ensure_ascii=False)
async def track_competitor_prices(
        search_query: str
    ):
    """scrape products from different competitors"""
    data = {}
    data["walmart"] = await scrape_walmart(
        search_query=search_query
    )
    data["amazon"] = await scrape_amazon(
        search_query=search_query
    )
    data["bestbuy"] = await scrape_bestbuy(
        search_query=search_query
    )
    product_count = sum(len(products) for products in data.values())
    log.success(f"successfully scraped {product_count} products")
    # save the results into a JSON file
    
    # create the insights file
    generate_insights(data)
    with open("data.json", "w", encoding="utf-8") as file:
        json.dump(data, file, indent=2, ensure_ascii=False)
# main competitor price tracking function function
async def run():
    log.info("----- Scheduler has started -----")
    await track_competitor_prices(
        search_query="PS5 digital edition"
    )
    log.success("----- Scheduler has finished -----")
async def main():
    while True:
        # run the script every 3 hours
        await run()
        await asyncio.sleep(3 * 3600)    
if __name__=="__main__":
    asyncio.run(main())
Bypass Scraping Blocking With ScrapFly
Our code about web scraping for tracking competitor prices relies on requesting popular websites with a high protection level. Moreover, web scraping prices often require requesting a high volume of web pages. So, attempting to scale our scrapers leads to guaranteed blocking:
from httpx import Client
# create HTTP client with headers that look like a real web browser
client = Client(
    headers={
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        "Accept-Language": "en-US,en;q=0.9,lt;q=0.8,et;q=0.7,de;q=0.6",
    },
    follow_redirects=True,
    http2=True
)
url = "https://www.amazon.com/s?rh=n%3A20972796011"
response = client.get(url)
print(response)
"<Response [503 Service Unavailable]>" # Amazon detected the request
This is where Scrapfly can help out!
ScrapFly provides web scraping, screenshot, and extraction APIs for data collection at scale.
- Anti-bot protection bypass - scrape web pages without blocking!
- Rotating residential proxies - prevent IP address and geographic blocks.
- JavaScript rendering - scrape dynamic web pages through cloud browsers.
- Full browser automation - control browsers to scroll, input and click on objects.
- Format conversion - scrape as HTML, JSON, Text, or Markdown.
- Python and Typescript SDKs, as well as Scrapy and no-code tool integrations.
Let's use ScrapFly to bypass the blocked request sent to Amazon. All we have to do is replace httpx with the ScrapFly client, enable anti-scraping protection using the asp parameter and select and a proxy country:
# standard web scraping code
import httpx
response = httpx.get("https://www.amazon.com/s?rh=n%3A20972796011")
# in ScrapFly, it becomes this 👇
from scrapfly import ScrapeConfig, ScrapflyClient, ScrapeApiResponse
scrapfly = ScrapflyClient(key="Your ScrapFly API key")
api_response: ScrapeApiResponse = scrapfly.scrape(
    ScrapeConfig(
        # target website URL
        url="https://www.amazon.com/s?rh=n%3A20972796011",
        # Bypass anti-scraping protection
        asp=True,        
        # select a proxy pool (residential or datacenter)
        proxy_pool="public_residential_pool",
        # Set the proxy location to a specific country
        country="US",        
        # enable JavaScript rendering if needed, similar to headless browsers
        render_js=True,
    )
)
# Print the website's status code
print(api_response.upstream_status_code)
"200"
# get the HTML from the response
html = api_response.scrape_result['content']
# use the built-in Prasel selector
selector = api_response.selector
FAQ
To wrap up this guide, let's have a look at some frequently asked questions about tracking competitor prices.
Is it possible to track historical product prices on e-commerce websites?
Yes. Using web scraping, you can scrape specific product pages for price data, schedule the scraper to request it every certain amount of time and then track prices by comparing the price values. We have covered tracking historical price data in a previous guide.
Can I monitor competitor prices in different languages and currencies?
Yes, you can change the web scraping language and currency by changing the requests' configuration, such as proxy location, headers and cookies. For more details, refer to our previous guide on web scraping localization.
Why does my scraper see a different price than the one I see in my browser?
Prices can be dynamically configured for specific regions and browser fingerprints. For the most accurate results ensure the scraper is scraping with the same configuration (IP address location, user agent, etc.) as the browser.
Summary
In this article, went through a step-by-step guide on tracking competitor prices using Python. We started by scraping product prices from Walmart, Amazon and BestBuy using httpx and Parsel. Then, we used Python to get insights into product pricing data and compare different competitors' performance. Finally, we scheduled our monitoring competitor prices tool using asyncio to keep the data up-to-date.