Staged Pipeline Architecture

The staged pipeline is the heart of Aveloxis’s production collection system. It decouples API collection from relational persistence to eliminate database contention at scale.


Why staging?

The contention problem

When collecting data from GitHub/GitLab, every issue, PR, event, and message references a contributor. In a direct-write model (like Augur’s), each worker must:

  1. Look up the contributor in the database

  2. Create the contributor if it does not exist

  3. Use the contributor’s ID as a foreign key for the entity

With 400K+ repos and multiple workers collecting simultaneously, the contributors table becomes a massive contention point. Multiple workers try to insert the same contributor at the same time, leading to deadlocks, serialization failures, and degraded throughput.

The staging solution

The staged pipeline splits collection into two phases:

  1. Stage (concurrent, no contention) – Raw API responses are written as JSONB to a staging table. No FK lookups, no contributor resolution. Workers blast data concurrently with zero contention.

  2. Process (single-threaded per repo) – Staged data is drained in batches, contributors are resolved in bulk with an in-memory cache, and entities are upserted in FK dependency order.

This design means that even with 24 workers collecting 24 repos simultaneously, no two workers ever contend on the same database row.


Staging writer

Batch flushing

The staging writer collects API responses in memory and flushes them to the aveloxis_ops.staging table in batches.

  • Default batch size: 1000 rows (configurable via collection.batch_size)

  • Actual flush size: 500 rows per batch during processing

  • Each row: One JSONB document containing a single entity or envelope

Callers MUST call Flush(ctx) before handing control to the processor

StagingWriter.Stage only auto-sends to Postgres when the in-memory pgx.Batch reaches stagingFlushSize = 500. For any workflow that stages fewer than 500 items — typically gap fill and open-item refresh — the buffered rows will not reach the database until Flush(ctx) is called explicitly. The processor reads aveloxis_ops.staging directly, so forgetting the flush makes the processor see an empty table and the buffered rows are discarded when the StagingWriter goes out of scope.

This bit fillIssueGaps / fillPRGaps / refreshIssues / refreshPRs before v0.16.11 — each built its own StagingWriter, staged well under 500 items, then called Processor.ProcessRepo directly. Logs reported gap fill completed filled=N while zero rows reached the database. Fix: always flush the writer before processing, and return/log any flush error. The main-path StagedCollector.CollectRepo was never affected because it flushes as part of its normal phase sequencing.

The staging table

CREATE TABLE IF NOT EXISTS aveloxis_ops.staging (
    staging_id  BIGSERIAL PRIMARY KEY,
    repo_id     BIGINT NOT NULL,
    entity_type TEXT NOT NULL,
    data        JSONB NOT NULL,
    created_at  TIMESTAMPTZ DEFAULT NOW()
);

The entity_type column determines how the data JSONB is interpreted during processing.


Envelope types

Issues and PRs are staged as envelope types that bundle the parent entity with all its children in a single JSONB row. This is a key design decision that avoids multiple passes over staged data.

stagedIssue

A staged issue envelope contains:

{
  "issue": { ... },
  "labels": [ ... ],
  "assignees": [ ... ]
}

When processed:

  1. The issue is upserted, returning its issue_id

  2. Labels are upserted with that issue_id

  3. Assignees are upserted with that issue_id

stagedPR

A staged PR envelope contains:

{
  "pull_request": { ... },
  "labels": [ ... ],
  "assignees": [ ... ],
  "reviewers": [ ... ],
  "reviews": [ ... ],
  "commits": [ ... ],
  "files": [ ... ],
  "head_meta": { ... },
  "base_meta": { ... }
}

When processed:

  1. The PR is upserted, returning its pull_request_id

  2. Labels, assignees, reviewers, reviews, commits, and files are upserted with that ID

  3. Head and base metadata are upserted with that ID

Why envelopes?

Without envelopes, processing would require:

  1. Process all PRs, getting their database IDs

  2. For each PR label, look up the PR’s database ID

  3. Repeat for every child entity type

Envelopes avoid this lookup step entirely – the parent and all children are processed together, and the parent’s database ID flows directly to the children.


Processing order

Staged data is processed in strict dependency order to satisfy foreign key constraints:

1. Contributors
   └── Must exist before any entity that references cntrb_id

2. Issues (with labels and assignees via envelope)
   └── References repo_id and cntrb_id (reporter, closer)

3. Pull Requests (with all children via envelope)
   └── References repo_id and cntrb_id (author)

4. Events (issue events + PR events)
   └── References issue_id or pull_request_id, plus cntrb_id

5. Messages (issue comments, PR comments, review comments)
   └── References issue_id or pull_request_id, plus cntrb_id

6. Metadata (repo info, releases, clone stats)
   └── References repo_id only

Each entity type is drained completely before the next type begins. Within each type, data is processed in 500-row batches.


Error isolation

The staged pipeline uses per-row error isolation:

  • If a single issue fails to upsert (e.g., due to a data constraint violation), a warning is logged but processing continues with the next issue.

  • If a single child entity within an envelope fails (e.g., one PR label out of 20), the other children are still processed.

  • Only catastrophic errors (database connection lost, disk full) abort the entire processing run.

This design means that one malformed record from the API does not prevent thousands of good records from being stored.


Restart and resume

The staged pipeline is designed for safe restart at any point:

On shutdown

  • Active API calls finish (graceful shutdown)

  • Queue locks are released (repos go back to queued status)

  • Any data already in the staging table is preserved

On startup

  • Leftover staging data from the previous run is processed first

  • This means data already fetched from the API is not lost

  • After staging is drained, normal queue polling resumes

Idempotent processing

All upserts use ON CONFLICT clauses, so processing the same staged data twice is harmless. Duplicate entities are silently merged.


Performance characteristics

Staging phase (Phase 1)

  • Bottleneck: API rate limits

  • Database load: Minimal (bulk JSONB inserts, no FK lookups)

  • Concurrency: Full – all workers can stage simultaneously

  • Memory: Low – data is flushed in batches

Processing phase (Phase 2)

  • Bottleneck: Contributor resolution cache misses

  • Database load: Moderate (upserts with FK lookups)

  • Concurrency: Single-threaded per repo (no cross-repo contention)

  • Memory: Moderate – contributor cache grows with distinct contributors

Contributor cache

The in-memory contributor cache maps:

  • Platform user ID -> cntrb_id UUID

  • Email -> cntrb_id UUID

  • Login -> cntrb_id UUID

On a cache hit, no database round-trip is needed. The cache is write-through: new contributors are inserted into both the cache and the database. The cache persists across repos within the same process lifetime.


Comparison with the direct pipeline

Aspect

Staged Pipeline (serve)

Direct Pipeline (collect)

Write pattern

JSONB staging -> batch processing

Direct upserts during collection

Contributor resolution

Bulk, with in-memory cache

Inline, per-entity

Concurrency

Multiple workers, no contention

Single-threaded

Restart safety

Staging data preserved

Must re-collect from API

Best for

Production (400K+ repos)

Ad-hoc testing (1-10 repos)


Next steps