Scraping politely

A lot of projects require scraping websites. I usually write a scraper, run it, it fetches all of the data, and then fails in some final step before writing it anywhere. Then I curse a bit and try to fix my program without being sure what the responses actually looked like. Then I rerun my script, crossing my fingers that I don’t go over any rate limits.

This isn’t optimal, so I’ve finally come up with a better system for this. My requirements are:

  1. Only download a page once.
  2. …for a given time period (e.g., a day). If I rerun after that time period, download the page again.
  3. Make everything human-readable. I want to be able to easily find the response for a given request and visually inspect it.
  4. Basic rate limiting support.
  5. Not reinvent the wheel.

So basically, if I request http://httpbin.org/anything?foo=bar I want it to save the response to a file like ./.db/cache/2021-07-31/httpbin.org_anything_foo_bar. Then I can cat the file and see the response (or delete it to “clear” the cache). However, URLs can be much longer than legal filenames (and the human-readable scheme above could cause collisions), so I’m going to compromise and store the response in file with an opaque hash for a name (e.g., ./.db/cache/2021-07-31/e23403ee51adae9260d7810e2f49f0f2098d8a25c3581440d25d20d02e00ccb9) and then have a CSV file in the directory that maps request URL -> hash. It’s not quite as user-friendly as being able to just visually examine the filename, but I can just do:

$ cat ./.db/cache/2021-07-31/cache_map.csv | grep 'foo=bar'
e23403ee51adae9260d7810e2f49f0f2098d8a25c3581440d25d20d02e00ccb9,http://httpbin.org/anything?foo=bar

I’m using Python, so for not reinventing the wheel, I decided to use requests-cache. The requests-cache package actually has an option to write responses to the filesystem, but I wanted some custom behavior: 1) the cache_map.csv file as described above and 2) naming cache directories by date. Thus, I implemented a custom storage layer for requests-cache to use.

requests-cache represents storage as a dict: each URL is hashed and then requests-cache calls the getter or setter for that hash, depending on if it’s reading or writing. Thus, to implement custom storage, I just have to implement the dict interface to read/write to the filesystem, plus keep my cache_map.csv up-to-date:

class FilesystemStorage(requests_cache.backends.BaseStorage):

    def __init__(self, **kwargs):
        # I'm using APIs that return JSON, so it's easiest to
        # use the built-in JSON serializer.
        super().__init__(serializer='json', **kwargs)

        # A cache a day keeps the bugs at bay.
        today = datetime.datetime.today().strftime('%Y-%m-%d')
        self._storage_dir = os.path.join('.db/cache', today)
        if not os.path.isdir(self._storage_dir):
            os.makedirs(self._storage_dir, exist_ok=True)

        # The map of filename hashes -> URLs.
        self._cache_map = os.path.join(self._storage_dir, 'cache_map.csv')
        # Load any existing cache.
        self._cache = self._LoadCacheMap()

    def _LoadCacheMap(self) -> Dict[str, str]:
        if not os.path.exists(self._cache_map):
            return {}
        # Using pandas is overkill, but are you even a data
        # scientist if you don't?
        return pd.read_csv(self._cache_map, index_col='filename')['url'].to_dict()

    # Dict implementation.

    def __getitem__(self, key: str) -> requests_cache.CachedResponse:
        if key not in self._cache:
            raise KeyError
        k = os.path.join(self._storage_dir, key)
        with open(k, mode='rb') as fh:
            content = fh.read()
        # I want to be able to get the URL from the response,
        # so adding it here.
        url = self._cache[key]
        return requests_cache.CachedResponse(content, url=url)

    def __setitem__(self, key: str, value: requests_cache.CachedResponse):
        # Note that `key` is already hashed, so we use `value`'s
        # URL attribute to get the human-readable URL.
        k = os.path.join(self._storage_dir, key)
        with open(k, mode='wt') as fh:
            json.dump(value.json(), fh)
        # Update cache map
        self._cache[key] = value.url
        # Write the cache back to the file system.
        (
            pd.Series(self._cache, name='url')
            .rename_axis('filename')
            .to_frame()
            .to_csv(self._cache_map)
        )

    # I don't plan on using these, so didn't both implementing them.
    def __delitem__(self, key):
        pass
    
    def __iter__(self):
        pass
    
    def __len__(self) -> int:
        return len(self._cache)

Now I add a simple cache class to use this custom storage:

class FilesystemCache(requests_cache.backends.BaseCache):
    """Stores a map of URL to filename."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        storage = FilesystemStorage(**kwargs)
        self.redirects = storage
        self.responses = storage

Note that I’m using the same instance of my cache for both responses and redirects. This isn’t optimal if I were actually expecting redirects, but I’m not and my storage layer is designed to be a singleton (as implemented, multiple instances would clobber each other).

Now I create a request class that uses my custom cache.

import requests_cache
from typing import Any, Dict

from lib import custom_cache

class Requester(object):

    def __init__(self):
        self._client = requests_cache.CachedSession(
            backend=custom_cache.FilesystemCache())

    def DoRequest(self, url: str) -> Dict[str, Any]:
        resp = self._client.get(url, headers=_GetHeader())
        body = resp.json()
        # The API I'm using always has a 'data' field in valid
        # responses, YMMV.
        if 'data' not in body:
            raise ValueError('Unexpected response: %s' % resp.text)
        return body

This reads auth info from environment variables:

def _GetHeader() -> Dict[str, str]:
    return {'Authorization': 'Bearer %s' % _GetBearerToken()}

def _GetBearerToken() -> str:
   bearer_token = os.getenv('bearer_token')
   if not bearer_token:
       raise RuntimeError('No bearer token found, try `source setup.env`')
   return bearer_token

Finally, I want to support rate limiting. I used the ratelimit package for this. ratelimit is based on the Twitter API, which rate limits on 15-minute intervals. So if I was hitting an endpoint that allowed 10 requests/minute (10*15 = 150 requests per 15 minutes) then I could write:

@ratelimit.sleep_and_retry
@ratelimit.limits(calls=150)
def DoApiCall(self, url) -> Dict[str, Any]:
    return self._requester.DoRequest(url)

This will block the program’s main thread if this function is called more frequently than the allowed rate limit (which may not be what you want, check the ratelimit docs for other options).

The downside of this implementation is that it still rate limits, even if you’re hitting the cache. You could get around this by checking the cache contents in Requester and then only conditionally calling DoApiCall, but this is left as an exercise for the reader 😉

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: