Creating Search Engine for any Website using Web Scraping

Creating Search Engine for any Website using Web Scraping

In this tutorial we'll take a look at how to create a search engine for any website by web scraping it's data, parsing and indexing sections of interest and wrapping it all up with intuitive GUI.

We'll be using lunr.js javascript search engine to display our search index and for data scraping and index generation we'll be using Python.

As an example projects, we'll scrape ScrapFly's documentation website and create a search engine for these each documentation page. In this exercise we'll learn about crawling, index section parsing from HTML documents and how to put all of this together as a search engine. Let's dive in!

0:00
/
quick demonstration on what we'll be creating

What is Lunrjs?

Lunr.js is a small, full-text search library for use in the browser. It indexes JSON documents and provides a simple search interface for retrieving documents that best match text queries.

illustration of lunr project

Since lunr is using JSON documents for it's index and runs easily in the browser from a single json file we can easily integrate it with web scraping!

We'll scrape HTML data from our source, parse it to JSON structure and feed that into Lunrjs front-end: creating our very own search engine!

Data Scraper

To collect our search engine data we'll first have to write a scraper which we'll use to retrieve the data for indexing. In this example, we'll use ScrapFly documentation pages: https://scrapfly.io/docs

For our scraper we'll use Python with few community packages:

  • httpx package for our HTTP connections
  • parsel package for parsing HTML data for values we want to be considered in our index.
  • lunr package for building our lunr index.
  • loguru [optional] package for easy, pretty logs so we can follow along easier.

We can install these libraries via pip command:

$ pip install httpx parsel lunr loguru

Using Crawling

For collecting index data we'll be using web scraping technique called crawling. Crawling is essentially a web scraping loop where our program continuously collects documents, finds more urls to scrape and repeats the process until nothing new is found.

illustrate of crawling flow
crawling loop keeps going until it can't discover more new urls to follow

In Python we can illustrate this process using our httpx and parsel tools:

import asyncio
from typing import List

import httpx
from loguru import logger as log
from parsel import Selector


def find_urls(resp: httpx.Response, xpath: str) -> set:
    """find crawlable urls in a response from an xpath"""
    found = set()
    urls = Selector(text=resp.text).xpath(xpath).getall()
    for url in urls:
        url = httpx.URL(resp.url).join(url.split("#")[0])
        if url.host != resp.url.host:
            log.debug(f"skipping url of a different hostname: {url.host}")
            continue
        found.add(str(url))
    return found


async def crawl(url, follow_xpath: str, session: httpx.AsyncClient, max_depth=10) -> List[httpx.Response]:
    """Crawl source with provided follow rules"""
    urls_seen = set()
    urls_to_crawl = [url]
    all_responses = []
    depth = 0
    while urls_to_crawl:
        # first we want to protect ourselves from accidental infinite crawl loops
        if depth > max_depth:
            log.error(
                f"max depth reached with {len(urls_to_crawl)} urls left in the crawl queue"
            )
            break
        log.info(f"scraping: {len(urls_to_crawl)} urls")
        responses = await asyncio.gather(*[session.get(url) for url in urls_to_crawl])
        found_urls = set()
        for resp in responses:
            all_responses.append(resp)
            found_urls = found_urls.union(find_urls(resp, xpath=follow_xpath))
        # find more urls to crawl that we haven't visited before:
        urls_to_crawl = found_urls.difference(urls_seen)
        urls_seen = urls_seen.union(found_urls)
        depth += 1
    log.info(f"found {len(all_responses)} responses")
    return all_responses

In the example above we, provide our crawler with crawling rules and starting point. The asynchronous recursive scraper keeps scraping urls until it finds everything it can.

Let's run it against our example target - ScrapFly docs:

# Example use:
async def run():
    limits = httpx.Limits(max_connections=3)
    headers = {"User-Agent": "ScrapFly Blog article"}
    async with httpx.AsyncClient(limits=limits, headers=headers) as session:
        responses = await crawl(
            # our starting point url
            url="https://scrapfly.io/docs", 
            # xpath to discover urls to crawl
            follow_xpath="//ul[contains(@class,'nav')]//li/a/@href",
            session=session,
        )

if __name__ == "__main__":
    asyncio.run(run())

We can see that this crawler quickly generates us 23 pages:

2022-05-26 | INFO     | __main__:crawl:33 - scraping: 1 urls
2022-05-26 | INFO     | __main__:crawl:33 - scraping: 22 urls
2022-05-26 | INFO     | __main__:crawl:43 - found 23 responses

With pages collected we can start parsing them for data we'll use in our search engine.

HTML cleanup

Since we are using HTML value in our index we get the advantages of seeing rich text like highlights, links and pictures, however for that we have to clean up our HTML data to prevent polluting our index with unnecessary values.

from httpx import Response
from parsel import Selector
from urllib.parse import urljoin

def get_clean_html_tree(
    resp: Response, remove_xpaths=(".//figure", ".//*[contains(@class,'carousel')]")
):
    """cleanup HTML tree from domain specific details like classes"""
    sel = Selector(text=resp.text)
    for remove_xp in remove_xpaths:
        for rm_node in sel.xpath(remove_xp):
            rm_node.remove()
    allowed_attributes = ["src", "href", "width", "height"]
    for el in sel.xpath("//*"):
        for k in list(el.root.attrib):
            if k in allowed_attributes:
                continue
            el.root.attrib.pop(k)
        # turn all link to absolute
        if el.root.attrib.get("href"):
            el.root.attrib["href"] = urljoin(str(resp.url), el.root.attrib["href"])
        if el.root.attrib.get("src"):
            el.root.attrib["src"] = urljoin(str(resp.url), el.root.attrib["src"])
    return sel

Here, we have our cleanup function which will remove unnecessary HTML node attributes. For quality search engine it's important to sanitize our data to prevent false positives.

Index Sections

With our HTMLs collected we can start parsing this data for search index.
For this part let's split each page into sections we'll use for index generation. This will let us create much better index for our search engine.

For sections, let's split the page by headers:

illustration of page division by heading sections
we'll divide every page in sections separated by heading

So, a single page will produce several index targets. This is great for our web search engine as headings usually indicate topics or subtopics and usually we can link directly to this part of the page using the # artifact syntax.
For example, when we visit https://scrapfly.io/docs/project#introduction our browser automatically scrolls to Introduction heading. This is controlled by HTML node's id attribute.

To split HTML by sections we can use a simple parsing algorithm in Python:

def parse(responses: List[Response]) -> List[dict]:
    """parse responses for index documents"""
    log.info(f"parsing documents from {len(responses)} responses")
    documents = []
    for resp in responses:
        sel = get_clean_html_tree(resp)

        sections = []
        # some pages might have multiple article bodies:
        for article in sel.xpath("//article"):
            section = []
            for node in article.xpath("*"):
                # separate page by <hX> nodes
                if re.search(r"h\d", node.root.tag) and len(section) > 1:
                    sections.append(section)
                    section = [node]
                else:
                    section.append(node)
            if section:
                sections.append(section)

        page_title = sel.xpath("//h1/text()").get("").strip()
        for section in sections:
            data = {
                "title": f"{page_title} | "
                + "".join(section[0].xpath(".//text()").getall()).strip(),
                "text": "".join(s.get() for s in section[1:]).strip(),
            }
            url_with_id_pointer = (
                str(resp.url) + "#" + (section[0].xpath("@id").get() or data["title"])
            )
            data["location"] = url_with_id_pointer
            documents.append(data)
    return documents

The parsing code above, splits our html tree by any heading element like h1, h2 etc. Further, let's parse these sections into index documents that are made up of title which is the hX node text and html body of the section.

Building Index

We have our scraper and parser ready - it's time to build our index. Our index will consist of JSON documents which are article sections we've extracted in previously:

[
  {
    "title": "title of the section",
    "text": "html value of the section",
  },
...
]

There are few ways to build our lunrjs index, but the simplest one is to use lunr python package:

import json
import lunr

def build_index(docs: List[dict]):
    """build lunrjs index from provided list of documents"""
    log.info(f"building index from {len(docs)} documents")
    config = {
        "lang": ["en"],
        "min_search_length": 1,
    }
    page_dicts = {"docs": docs, "config": config}
    idx = lunr(
        ref="location",
        fields=("title", "text"),
        documents=docs,
        languages=["en"],
    )
    page_dicts["index"] = idx.serialize()
    return json.dumps(page_dicts, sort_keys=True, separators=(",", ":"), indent=2)

This function takes in list of documents and generates lunr index. Let's give a shot!

Putting everything together

We defined all of our components:

  • crawler which collects HTML documents.
  • parser which parses each HTML document by section.
  • index builder which turns section documents into one lunrjs JSON index.
Our final project code (click to show)
import asyncio
import json
import re
from typing import List
from urllib.parse import urljoin

import httpx
from httpx import Response
from loguru import logger as log
from lunr import lunr
from parsel import Selector


def find_urls(resp: httpx.Response, xpath: str) -> set:
    """find crawlable urls in a response from an xpath"""
    found = set()
    urls = Selector(text=resp.text).xpath(xpath).getall()
    for url in urls:
        url = httpx.URL(resp.url).join(url.split("#")[0])
        if url.host != resp.url.host:
            log.debug(f"skipping url of a different hostname: {url.host}")
            continue
        found.add(str(url))
    return found


async def crawl(url, follow_xpath: str, session: httpx.AsyncClient, max_depth=10) -> List[httpx.Response]:
    """Crawl source with provided follow rules"""
    urls_seen = set()
    urls_to_crawl = [url]
    all_responses = []
    depth = 0
    while urls_to_crawl:
        # first we want to protect ourselves from accidental infinite crawl loops
        if depth > max_depth:
            log.error(
                f"max depth reached with {len(urls_to_crawl)} urls left in the crawl queue"
            )
            break
        log.info(f"scraping: {len(urls_to_crawl)} urls")
        responses = await asyncio.gather(*[session.get(url) for url in urls_to_crawl])
        found_urls = set()
        for resp in responses:
            all_responses.append(resp)
            found_urls = found_urls.union(find_urls(resp, xpath=follow_xpath))
        # find more urls to crawl that we haven't visited before:
        urls_to_crawl = found_urls.difference(urls_seen)
        urls_seen = urls_seen.union(found_urls)
        depth += 1
    log.info(f"found {len(all_responses)} responses")
    return all_responses


def get_clean_html_tree(
    resp: Response, remove_xpaths=(".//figure", ".//*[contains(@class,'carousel')]")
):
    """cleanup HTML tree from domain specific details like classes"""
    sel = Selector(text=resp.text)
    for remove_xp in remove_xpaths:
        for rm_node in sel.xpath(remove_xp):
            rm_node.remove()
    allowed_attributes = ["src", "href", "width", "height"]
    for el in sel.xpath("//*"):
        for k in list(el.root.attrib):
            if k in allowed_attributes:
                continue
            el.root.attrib.pop(k)
        # turn all link to absolute
        if el.root.attrib.get("href"):
            el.root.attrib["href"] = urljoin(str(resp.url), el.root.attrib["href"])
        if el.root.attrib.get("src"):
            el.root.attrib["src"] = urljoin(str(resp.url), el.root.attrib["src"])
    return sel


def parse(responses: List[Response]) -> List[dict]:
    """parse responses for index documents"""
    log.info(f"parsing documents from {len(responses)} responses")
    documents = []
    for resp in responses:
        sel = get_clean_html_tree(resp)

        sections = []
        # some pages might have multiple article bodies:
        for article in sel.xpath("//article"):
            section = []
            for node in article.xpath("*"):
                # separate page by <hX> nodes
                if re.search(r"h\d", node.root.tag) and len(section) > 1:
                    sections.append(section)
                    section = [node]
                else:
                    section.append(node)
            if section:
                sections.append(section)

        page_title = sel.xpath("//h1/text()").get("").strip()
        for section in sections:
            data = {
                "title": f"{page_title} | "
                + "".join(section[0].xpath(".//text()").getall()).strip(),
                "text": "".join(s.get() for s in section[1:]).strip(),
            }
            url_with_id_pointer = (
                str(resp.url) + "#" + (section[0].xpath("@id").get() or data["title"])
            )
            data["location"] = url_with_id_pointer
            documents.append(data)
    return documents


def build_index(docs: List[dict]):
    """build lunrjs index from provided list of documents"""
    log.info(f"building index from {len(docs)} documents")
    config = {
        "lang": ["en"],
        "min_search_length": 1,
    }
    page_dicts = {"docs": docs, "config": config}
    idx = lunr(
        ref="location",
        fields=("title", "text"),
        documents=docs,
        languages=["en"],
    )
    page_dicts["index"] = idx.serialize()
    return json.dumps(page_dicts, sort_keys=True, separators=(",", ":"), indent=2)


async def run():
    """
    example run function:
    establishes http session, crawls html documents, 
    turns them into index documents and compiles lunr index
    """
    limits = httpx.Limits(max_connections=3)
    timeout = httpx.Timeout(20.0)
    headers = {"User-Agent": "ScrapFly Blog article"}
    async with httpx.AsyncClient(
        limits=limits, headers=headers, timeout=timeout
    ) as session:
        responses = await crawl(
            # our starting point url
            url="https://scrapfly.io/docs",
            # xpath to discover urls to crawl
            follow_xpath="//ul[contains(@class,'nav')]//li/a/@href",
            session=session,
        )
        documents = parse(responses)
        with open("search_index.json", "w") as f:
            f.write(build_index(documents))


if __name__ == "__main__":
    asyncio.run(run())

If we run this code search_index.json will be generated. It's not much use to us without a front-end so let's put one up!

Front End Explorer

For our search front-end we put together a simple viewer for the purpose of this tutorial and it can be found on our github

Let's clone this front-end and give it our generated index:

# create project directory
$ cd docs-search
# clone our front-end
$ git clone https://github.com/Granitosaurus/simple-lunrjs-display
$ cd simple-lunrjs-display
# replace search_index.json with the one we generated
$ cp ../search_index.json .
# start a http server to see our search engine live!
$ python -m http.server --bind 127.0.0.1

Now, if we go to http://127.0.0.1:8000 we can explore our search!

0:00
/
every key click gives us instantaneous new results!

Use ScrapFly to Handle Dynamic Pages and Blocking

When crawling pages we often encounter two issues:

  • To see full content of a dynamic page we need render javascript
  • Some pages might block our scraper from accessing it.

This is where Scrapfly can help you to scale up your dynamic page scrapers!

scrapfly middleware

ScrapFly provides web scraping, screenshot, and extraction APIs for data collection at scale.

Let's modify our scraper to use ScrapFly API. For this, we'll be using scrapfly-sdk python package and ScrapFly's anti scraping protection bypass and Javascript Rendering features.
First, let's install scrapfly-sdk using pip:

$ pip install scrapfly-sdk

Then all we have to do is replace our httpx client with ScrapflyClient:


import asyncio
import json
import re
from typing import List
from urllib.parse import urljoin
from httpx import URL

from requests import Response
from loguru import logger as log
from lunr import lunr
from parsel import Selector
from scrapfly import ScrapflyClient, ScrapeConfig


def find_urls(resp: Response, xpath: str) -> set:
    """find crawlable urls in a response from an xpath"""
    found = set()
    urls = Selector(text=resp.text).xpath(xpath).getall()
    for url in urls:
        url = urljoin(resp.url, url.split("#")[0])
        if URL(url).host != URL(resp.url).host:
            log.debug(f"skipping url of a different hostname: {url.host}")
            continue
        found.add(str(url))
    return found


async def crawl(url, follow_xpath: str, session: ScrapflyClient, max_depth=10) -> List[Response]:
    """Crawl source with provided follow rules"""
    urls_seen = set()
    urls_to_crawl = [url]
    all_responses = []
    depth = 0
    while urls_to_crawl:
        # first we want to protect ourselves from accidental infinite crawl loops
        if depth > max_depth:
            log.error(
                f"max depth reached with {len(urls_to_crawl)} urls left in the crawl queue"
            )
            break
        log.info(f"scraping: {len(urls_to_crawl)} urls")
        responses = await session.concurrent_scrape([
            ScrapeConfig(
                url=url, 
                # to render javascript for dynamic pages
                render_js=True,
                # enable anti bot protection bypass to avoid blocking
                asp=True
            )
            for url in urls_to_crawl
        ])
        responses = [scrapfly_response.upstream_result_into_response() for scrapfly_response in responses]
        found_urls = set()
        for resp in responses:
            all_responses.append(resp)
            found_urls = found_urls.union(find_urls(resp, xpath=follow_xpath))
        # find more urls to crawl that we haven't visited before:
        urls_to_crawl = found_urls.difference(urls_seen)
        urls_seen = urls_seen.union(found_urls)
        depth += 1
    log.info(f"found {len(all_responses)} responses")
    return all_responses


def get_clean_html_tree(
    resp: Response, remove_xpaths=(".//figure", ".//*[contains(@class,'carousel')]")
):
    """cleanup HTML tree from domain specific details like classes"""
    sel = Selector(text=resp.text)
    for remove_xp in remove_xpaths:
        for rm_node in sel.xpath(remove_xp):
            rm_node.remove()
    allowed_attributes = ["src", "href", "width", "height"]
    for el in sel.xpath("//*"):
        for k in list(el.root.attrib):
            if k in allowed_attributes:
                continue
            el.root.attrib.pop(k)
        # turn all link to absolute
        if el.root.attrib.get("href"):
            el.root.attrib["href"] = urljoin(str(resp.url), el.root.attrib["href"])
        if el.root.attrib.get("src"):
            el.root.attrib["src"] = urljoin(str(resp.url), el.root.attrib["src"])
    return sel


def parse(responses: List[Response]) -> List[dict]:
    """parse responses for index documents"""
    log.info(f"parsing documents from {len(responses)} responses")
    documents = []
    for resp in responses:
        sel = get_clean_html_tree(resp)

        sections = []
        # some pages might have multiple article bodies:
        for article in sel.xpath("//article"):
            section = []
            for node in article.xpath("*"):
                # separate page by  nodes
                if re.search(r"h\d", node.root.tag) and len(section) > 1:
                    sections.append(section)
                    section = [node]
                else:
                    section.append(node)
            if section:
                sections.append(section)

        page_title = sel.xpath("//h1/text()").get("").strip()
        for section in sections:
            data = {
                "title": f"{page_title} | "
                + "".join(section[0].xpath(".//text()").getall()).strip(),
                "text": "".join(s.get() for s in section[1:]).strip(),
            }
            url_with_id_pointer = (
                str(resp.url) + "#" + (section[0].xpath("@id").get() or data["title"])
            )
            data["location"] = url_with_id_pointer
            documents.append(data)
    return documents


def build_index(docs: List[dict]):
    """build lunrjs index from provided list of documents"""
    log.info(f"building index from {len(docs)} documents")
    config = {
        "lang": ["en"],
        "min_search_length": 1,
    }
    page_dicts = {"docs": docs, "config": config}
    idx = lunr(
        ref="location",
        fields=("title", "text"),
        documents=docs,
        languages=["en"],
    )
    page_dicts["index"] = idx.serialize()
    return json.dumps(page_dicts, sort_keys=True, separators=(",", ":"), indent=2)


async def run():
    """
    example run function:
    establishes http session, crawls html documents, 
    turns them into index documents and compiles lunr index
    """
    with ScrapflyClient(key="YOUR_SCRAPFLY_API_KEY", max_concurrency=2) as session:
        responses = await crawl(
            # our starting point url
            url="https://scrapfly.io/docs",
            # xpath to discover urls to crawl
            follow_xpath="//ul[contains(@class,'nav')]//li/a/@href",
            session=session,
        )
        documents = parse(responses)
        with open("search_index.json", "w") as f:
            f.write(build_index(documents))


if __name__ == "__main__":
    asyncio.run(run())

Summary

In this tutorial we used Python and lunrjs framework to create a search engine from web scraped data. We've started by writing a crawler for our source which scrapes all HTML data recursively. Then, we learned about index creation by parsing HTML document data into sections which we later fed into our lunrjs index generator to prebuild our index.

Using search engines like this is a great way to present web scraped data and create personal knowledge bases. It's even easier when using ScrapFly API to render dynamic pages and avoid scraper blocking so give it a shot!

Related Posts

Ultimate Guide to JSON Parsing in Python

Learn JSON parsing in Python with this ultimate guide. Explore basic and advanced techniques using json, and tools like ijson and nested-lookup

Guide to Parsel - the Best HTML Parsing in Python

Learn to extract data from websites with Parsel, a Python library for HTML parsing using CSS selectors and XPath.

JSONL vs JSON

Learn the differences between JSON and JSONLines, their use cases, and efficiency. Why JSONLines excels in web scraping and real-time processing