How the pipeline moves money data from Gmail to Monarch, with Telegram in the loop.
UPI Ingestor is a personal finance automation pipeline built as a single Next.js application deployed on Vercel. It has no background workers or queues — the entire flow runs synchronously inside a single function invocation, triggered either by a daily cron job or a manual fetch from the dashboard.
auth.uid() = user_id. Browser-facing routes use the publishable/anon key with SSR sessions; the cron job and pipeline use SUPABASE_SECRET_KEY via lib/supabase/admin.ts (server-only, never exposed to the client).
Vercel cron fires GET /api/cron/poll at 3:30 UTC daily (vercel.json), authenticated with CRON_SECRET. The handler lists all rows in gmail_connections using the secret-key admin client, then runs the pipeline per user. The UI can also trigger POST /api/gmail/fetch-now manually (session auth). Both call processUserTransactions(userId) in src/lib/pipeline.ts.
email-sources/gmail.ts
Loads the user's encrypted Gmail OAuth refresh token from gmail_connections, decrypts it, and calls the Gmail API to list messages matching the GMAIL_FETCH_LABEL (default: UPI) received within the last GMAIL_FETCH_DAYS_BACK days.
parsers/hdfc.ts
Each email body is matched against the HDFC parser using regex. Extracts amount, merchantRaw, bankRefId, occurredAt, and a normalized merchant key. Parse errors are reported via Telegram if a bot is linked. The parser registry (parsers/index.ts) is additive — new bank parsers can be registered without changing the pipeline.
Each ParsedTransaction is checked against transactions by (user_id, bank_ref_id). Duplicates are skipped. New transactions are inserted with status: pending.
categorizer/engine.ts
Pass 1 — Rules: user-defined rules in the rules table. Each rule has a match_type (regex / contains / equals), a target field (merchant / sender / body), and a pattern. Evaluated in ascending priority order; first match wins.
Pass 2 — Merchant mappings: the normalized merchant key is looked up in merchant_mappings. Mappings are learned when a user responds to a Telegram prompt. No match → human-in-the-loop flow.
Unknown transactions are marked needs_review. A Telegram inline keyboard is sent with the user's three most recently used categories plus a "Type new category" option. A row is inserted into pending_reviews (7-day TTL).
When the user taps a button, POST /api/telegram/webhook receives a callback_query with callback_data: cat:<txId>:<category>. The handler saves a new merchant_mapping and triggers republish.
publishers/monarch/
Decrypts the user's stored Monarch credential, calls their GraphQL createTransaction mutation, and updates the transaction row with status: published and the returned published_id. On failure, status: failed is set and the error is stored in raw_payload.publish_error for manual retry.
| Module | Role | Key exports |
|---|---|---|
lib/pipeline.ts | Pipeline orchestrator | processUserTransactions(userId) |
lib/email-sources/gmail.ts | Gmail API client; fetches messages matching the UPI label | fetchGmailTransactions(userId) |
lib/parsers/hdfc.ts | Regex parser for HDFC UPI notification emails | parseHdfc(body) |
lib/parsers/index.ts | Parser registry — tries all parsers; returns first match | parseEmail(body) |
lib/categorizer/engine.ts | Two-pass categorization: rules → merchant mappings → unknown | categorizeTransaction(supabase, userId, tx) |
lib/categorizer/normalize.ts | Merchant name normalization | normalizeMerchant(raw) |
lib/merchant-mappings.ts | CRUD helpers for the learned merchant→category mapping table | saveMerchantMapping() |
lib/publishers/monarch/ | Monarch GraphQL publisher; handles credential decryption and mutation | publisher.publish(userId, tx) |
lib/telegram/client.ts | Telegram Bot API wrapper for sending inline keyboard messages | sendTelegramMessage(chatId, text, markup) |
lib/crypto/encryption.ts | AES-256-GCM encrypt/decrypt for credentials stored in Supabase | encrypt(plain) · decrypt(enc) |
lib/supabase/admin.ts | Secret-key Supabase client for cron and pipeline (bypasses RLS server-side) | createAdminClient() |
| Method + Path | Auth | Purpose |
|---|---|---|
GET /api/cron/poll | CRON_SECRET + secret key | Vercel cron trigger — lists all gmail_connections, runs full pipeline per user |
POST /api/gmail/fetch-now | session | Manual pipeline trigger from the dashboard |
POST /api/telegram/webhook | webhook secret | Receives Telegram callback queries; saves merchant mapping and republishes |
GET /api/transactions | session | List transactions with optional ?status= filter |
GET /api/transactions/:id | session | Single transaction detail |
POST /api/transactions/:id/republish | session | Retry a failed Monarch publish |
GET/POST /api/rules | session | List / create categorization rules |
DELETE /api/rules/:id | session | Delete a rule |
GET /api/auth/google | — | Initiates Google OAuth flow |
POST /api/connect/telegram | session | Generates a Telegram link code |
POST /api/connect/monarch | session | Stores encrypted Monarch credential |
GET /api/connect/monarch/categories | session | Fetches the user's Monarch category list |
All tables live in the public schema on Supabase Postgres with Row Level Security. Every policy enforces auth.uid() = user_id.