Skip to the content.

Technical Implementation

A four-stage pipeline that turns retail leaflet websites into structured promotion data. Every stage reads from and writes to SQLite; every stage is independently runnable and resumable. Built around three explicit extension points so adding a new site or a new extraction backend doesn't require rewriting the pipeline.


Architecture overview

┌───────────┐   ┌─────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐
│  SCRAPE   │ → │  STORE  │ → │ DOWNLOAD │ → │ EXTRACT  │ → │ ANALYZE  │
│ sources/  │   │   db/   │   │ images.py│   │extractors│   │  your BI │
└───────────┘   └─────────┘   └──────────┘   └──────────┘   └──────────┘
   pluggable      schema.sql    resumable      pluggable     reads
   per-site       + 3 SQL views                              promotions

The whole pipeline is driven by three SQL views — catalogs_to_read, pages_to_download, pages_to_analyze — that turn “what work is left?” into a single query each stage can re-run idempotently.


Module layout

leaflets/
├── config.py            env-driven Settings dataclass
├── logging.py           stderr handler with timestamp + level + module
├── http.py              requests.Session + urllib3.Retry + polite delay
├── models.py            transport dataclasses (Store, CatalogStub, ...)
├── images.py            atomic .part-rename downloader, skips done files
├── pipeline.py          stage functions + run_all()
├── cli.py               argparse subcommands → pipeline functions
├── db/
│   ├── schema.sql       tables + views (idempotent)
│   ├── connection.py    context manager + init_db()
│   └── repository.py    every SQL statement lives here, parameterized
├── sources/
│   ├── base.py          Source ABC: fetch_stores / catalog_stubs / metadata
│   ├── wowdeals.py      first concrete implementation
│   ├── _template.py     copy-paste template for new sites
│   └── __init__.py      SOURCES registry dict
└── extractors/
    ├── base.py          Extractor ABC + ExtractedPromotion / ExtractionResult
    ├── prompts.py       system prompt + JSON schema (output_config.format)
    ├── claude_vision.py Claude Opus 4.7 implementation
    └── __init__.py      EXTRACTORS registry dict

Storage model

sources slug · name · base_url stores source_id · slug · name · url catalogs store_id · url · dates · pages_count pages catalog_id · page_number · url · local_path promotions page_id · brand · price · ... analysis_runs page_id · extractor · status · error VIEWS (no storage) catalogs_to_read WHERE metadata_fetched_at IS NULL pages_to_download WHERE local_path IS NULL
Table Purpose
sources Registered scrape sources (one row per site).
stores Retailers / vendors discovered from a source.
catalogs Individual leaflets with date range and page count.
pages Each leaflet page (URL + downloaded local path).
promotions Promotion rows extracted by the AI step.
analysis_runs Audit log of AI-extraction attempts per page.

Foreign keys with ON DELETE CASCADE everywhere, WAL journal mode, PRAGMA foreign_keys = ON on every connection.


The five stages, in detail

STAGE 1

sync-stores

Hits the source's landing page, parses the vendor list, upserts stores. Idempotent — re-running refreshes names/URLs without duplicating rows.

STAGE 2

sync-catalogs

For every known store, paginates the catalog list and inserts new catalog stubs (URL + name only, no metadata yet). Stops at the first empty page.

STAGE 3

sync-metadata

Iterates catalogs_to_read, fetches each catalog's detail page, parses date range and page count, inserts one row per leaflet page. Marks metadata_fetched_at.

STAGE 4

download-images

Iterates pages_to_download, fetches each JPG, writes to a .part file then atomically renames. Skips files already on disk with size > 0.

STAGE 5

extract

Iterates pages_to_analyze, sends each image to Claude with a cached system prompt and a JSON schema, parses the structured response, writes to promotions. Audit row in analysis_runs.

ANY ORDER

Resumable

Crash mid-run? No problem. The next run picks up exactly where it stopped — driven by view boundaries, not in-memory state.


Three extension points

The architecture is built around three explicit seams. Adding a feature usually touches exactly one of them:

Extension What you write Where it plugs in
New scrape source New module under sources/ implementing Source Add to SOURCES dict — pipeline picks it up automatically
New extractor New module under extractors/ implementing Extractor Add to EXTRACTORS dict — --extractor <slug> selects it
New schema field Add column to schema.sql + new repository function Pipeline + extractor read from typed dataclass, no inline SQL elsewhere to update

Deliberately not extension points yet: no plugin loader, no entry-points discovery, no DI container. Sources and extractors are added by editing one dict — the simplest mechanism that meets the requirement, no framework on top.


AI extraction

Anthropic API request model: claude-opus-4-7 system + cache_control: ephemeral image (base64) + extraction prompt output_config.format: json_schema Structured response { "confidence": 0.92, "promotions": [ { product, brand, ... price: 6.5, was_price: 8.0 }, { ... } ] }

Defaults (all env-configurable):

Setting Default Notes
LEAFLETS_EXTRACT_MODEL claude-opus-4-7 Switch to claude-sonnet-4-6 for cost-sensitive runs.
LEAFLETS_EXTRACT_THINKING disabled Set adaptive if accuracy is poor on dense pages.
LEAFLETS_EXTRACT_EFFORT high medium is fine for most pages.

Output schema (per row): product_name, brand, category, unit, price, original_price, discount_percentage, raw_text, extra_json. Constrained server-side via output_config.format with json_schema so the model cannot return a malformed payload.

The system prompt is sent with cache_control: { type: "ephemeral" }. As the extraction prompt grows it will trigger Opus 4.7’s prompt cache (4096-token minimum prefix) — at that point repeat extractions in the same window read the prompt at ~10% of the base input price.


Cross-cutting concerns

HTTP layer (http.py). One HttpClient instance per CLI invocation. urllib3.Retry for 429/5xx with exponential backoff. Polite delay between requests (LEAFLETS_REQUEST_DELAY, default 1s). Hard timeout on every call (LEAFLETS_HTTP_TIMEOUT, default 30s).

Connection management (db/connection.py). connect() is a context manager that commits on clean exit and rolls back on exception. The pipeline opens a fresh transaction per logical unit of work (one per store, one per catalog) so a partial failure doesn’t lose the previous successes.

Logging. Standard logging module with one stderr handler. Pipeline progress at INFO; HTTP and SQL at DEBUG. urllib3 chatter clamped to WARNING.

Configuration. Env vars only, all prefixed LEAFLETS_* (plus ANTHROPIC_API_KEY). .env.example is the contract; .env is gitignored. Everything has sensible defaults — leaflets init-db works zero-config.


Deployment

The repo ships with .github/workflows/sync.yml, a daily cron at 04:30 UTC. DB and images persist between runs as workflow artifacts; for production swap those upload steps for S3 / GCS, or move to a runner with persistent storage.

For self-hosted deployments the rough sizing per source per day:

Resource Estimate
HTTP requests A few hundred at 1 req/sec — minutes
Image downloads ~100–500 JPGs (1–5 MB each)
Storage growth ~1–2 GB per source per month
Extraction (Opus 4.7) per page ~$0.02–0.05 depending on effort/thinking

Quick start

git clone https://github.com/scripts-and-tables/py-leaflets.git
cd py-leaflets
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
cp .env.example .env

leaflets init-db
leaflets run-all --source wowdeals_ae
leaflets extract --extractor claude_vision --limit 10

Full reference: see the README on GitHub.


What’s deliberately out of scope (today)

Want to dig in?

Architecture, data model, and source code are public. Pull the repo and run it locally.

View on GitHub