Technical Implementation
A four-stage pipeline that turns retail leaflet websites into structured promotion data. Every stage reads from and writes to SQLite; every stage is independently runnable and resumable. Built around three explicit extension points so adding a new site or a new extraction backend doesn't require rewriting the pipeline.
Architecture overview
┌───────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ SCRAPE │ → │ STORE │ → │ DOWNLOAD │ → │ EXTRACT │ → │ ANALYZE │
│ sources/ │ │ db/ │ │ images.py│ │extractors│ │ your BI │
└───────────┘ └─────────┘ └──────────┘ └──────────┘ └──────────┘
pluggable schema.sql resumable pluggable reads
per-site + 3 SQL views promotions
The whole pipeline is driven by three SQL views — catalogs_to_read,
pages_to_download, pages_to_analyze — that turn “what work is left?” into
a single query each stage can re-run idempotently.
Module layout
leaflets/
├── config.py env-driven Settings dataclass
├── logging.py stderr handler with timestamp + level + module
├── http.py requests.Session + urllib3.Retry + polite delay
├── models.py transport dataclasses (Store, CatalogStub, ...)
├── images.py atomic .part-rename downloader, skips done files
├── pipeline.py stage functions + run_all()
├── cli.py argparse subcommands → pipeline functions
├── db/
│ ├── schema.sql tables + views (idempotent)
│ ├── connection.py context manager + init_db()
│ └── repository.py every SQL statement lives here, parameterized
├── sources/
│ ├── base.py Source ABC: fetch_stores / catalog_stubs / metadata
│ ├── wowdeals.py first concrete implementation
│ ├── _template.py copy-paste template for new sites
│ └── __init__.py SOURCES registry dict
└── extractors/
├── base.py Extractor ABC + ExtractedPromotion / ExtractionResult
├── prompts.py system prompt + JSON schema (output_config.format)
├── claude_vision.py Claude Opus 4.7 implementation
└── __init__.py EXTRACTORS registry dict
Storage model
| Table | Purpose |
|---|---|
sources |
Registered scrape sources (one row per site). |
stores |
Retailers / vendors discovered from a source. |
catalogs |
Individual leaflets with date range and page count. |
pages |
Each leaflet page (URL + downloaded local path). |
promotions |
Promotion rows extracted by the AI step. |
analysis_runs |
Audit log of AI-extraction attempts per page. |
Foreign keys with ON DELETE CASCADE everywhere, WAL journal mode,
PRAGMA foreign_keys = ON on every connection.
The five stages, in detail
sync-stores
Hits the source's landing page, parses the vendor list, upserts stores. Idempotent — re-running refreshes names/URLs without duplicating rows.
sync-catalogs
For every known store, paginates the catalog list and inserts new catalog stubs (URL + name only, no metadata yet). Stops at the first empty page.
sync-metadata
Iterates catalogs_to_read, fetches each catalog's detail page, parses date range and page count, inserts one row per leaflet page. Marks metadata_fetched_at.
download-images
Iterates pages_to_download, fetches each JPG, writes to a .part file then atomically renames. Skips files already on disk with size > 0.
extract
Iterates pages_to_analyze, sends each image to Claude with a cached system prompt and a JSON schema, parses the structured response, writes to promotions. Audit row in analysis_runs.
Resumable
Crash mid-run? No problem. The next run picks up exactly where it stopped — driven by view boundaries, not in-memory state.
Three extension points
The architecture is built around three explicit seams. Adding a feature usually touches exactly one of them:
| Extension | What you write | Where it plugs in |
|---|---|---|
| New scrape source | New module under sources/ implementing Source |
Add to SOURCES dict — pipeline picks it up automatically |
| New extractor | New module under extractors/ implementing Extractor |
Add to EXTRACTORS dict — --extractor <slug> selects it |
| New schema field | Add column to schema.sql + new repository function |
Pipeline + extractor read from typed dataclass, no inline SQL elsewhere to update |
Deliberately not extension points yet: no plugin loader, no entry-points discovery, no DI container. Sources and extractors are added by editing one dict — the simplest mechanism that meets the requirement, no framework on top.
AI extraction
Defaults (all env-configurable):
| Setting | Default | Notes |
|---|---|---|
LEAFLETS_EXTRACT_MODEL |
claude-opus-4-7 |
Switch to claude-sonnet-4-6 for cost-sensitive runs. |
LEAFLETS_EXTRACT_THINKING |
disabled |
Set adaptive if accuracy is poor on dense pages. |
LEAFLETS_EXTRACT_EFFORT |
high |
medium is fine for most pages. |
Output schema (per row): product_name, brand, category, unit,
price, original_price, discount_percentage, raw_text, extra_json.
Constrained server-side via output_config.format with json_schema so the
model cannot return a malformed payload.
The system prompt is sent with cache_control: { type: "ephemeral" }. As the
extraction prompt grows it will trigger Opus 4.7’s prompt cache (4096-token
minimum prefix) — at that point repeat extractions in the same window read the
prompt at ~10% of the base input price.
Cross-cutting concerns
HTTP layer (http.py). One HttpClient instance per CLI invocation.
urllib3.Retry for 429/5xx with exponential backoff. Polite delay between
requests (LEAFLETS_REQUEST_DELAY, default 1s). Hard timeout on every call
(LEAFLETS_HTTP_TIMEOUT, default 30s).
Connection management (db/connection.py). connect() is a context
manager that commits on clean exit and rolls back on exception. The pipeline
opens a fresh transaction per logical unit of work (one per store, one per
catalog) so a partial failure doesn’t lose the previous successes.
Logging. Standard logging module with one stderr handler. Pipeline
progress at INFO; HTTP and SQL at DEBUG. urllib3 chatter clamped to WARNING.
Configuration. Env vars only, all prefixed LEAFLETS_* (plus
ANTHROPIC_API_KEY). .env.example is the contract; .env is gitignored.
Everything has sensible defaults — leaflets init-db works zero-config.
Deployment
The repo ships with .github/workflows/sync.yml, a daily cron at 04:30 UTC.
DB and images persist between runs as workflow artifacts; for production swap
those upload steps for S3 / GCS, or move to a runner with persistent storage.
For self-hosted deployments the rough sizing per source per day:
| Resource | Estimate |
|---|---|
| HTTP requests | A few hundred at 1 req/sec — minutes |
| Image downloads | ~100–500 JPGs (1–5 MB each) |
| Storage growth | ~1–2 GB per source per month |
| Extraction (Opus 4.7) per page | ~$0.02–0.05 depending on effort/thinking |
Quick start
git clone https://github.com/scripts-and-tables/py-leaflets.git
cd py-leaflets
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
cp .env.example .env
leaflets init-db
leaflets run-all --source wowdeals_ae
leaflets extract --extractor claude_vision --limit 10
Full reference: see the README on GitHub.
What’s deliberately out of scope (today)
- Async. Sync code top to bottom. Sufficient for daily-cron throughput; revisit if you need 100+ parallel sources.
- Migration framework.
schema.sqlis idempotent but unversioned. Introduce Alembic when columns start being renamed. - Production storage. Currently file-based SQLite. Swap to managed Postgres + S3 for hosted offering.
- Analysis layer above
promotions. The table is populated; querying, dashboards, and search are the next layer up.
Want to dig in?
Architecture, data model, and source code are public. Pull the repo and run it locally.