Adding a collection phase

The aveloxis collection pipeline is a sequence of phases, each owning a piece of the per-repo data graph. This chapter explains the phase model and walks through adding a new one.

The pipeline shape

Per repo, the scheduler runs phases roughly in this order:

1. Prelim         — redirect/dead repo detection (collector/prelim.go)
2. Staged         — API data → JSONB staging → relational tables (collector/staged.go)
                     Sub-phases inside staged:
                       0. Repo metadata (repo_info)
                       1. Contributors
                       2. Issues (with labels + assignees)
                       3. PRs (with labels, assignees, reviewers, reviews, commits, files, meta)
                       4. Events (issue + PR timelines)
                       5. Messages (comments)
                       6. Releases
3. Facade         — bare git clone + git log walk (collector/facade.go)
4. Analysis       — dependency scan, libyear, scc complexity (collector/analysis.go)
4b. Scorecard     — OpenSSF Scorecard (collector/scorecard.go)
5. Commit Resolution — resolve git authors to platform users (collector/commit_resolver.go)
6. SBOM           — CycloneDX + SPDX generation (collector/sbom.go)
7. Vulnerability  — OSV.dev batch API (collector/vulnerability.go)

The scancode phase (license/copyright scanning) runs in a separate dedicated worker pool, NOT inline in this pipeline — see docs/architecture/scancode.md for why.

When to add a phase vs extend an existing one

Add a phase when:

  • The work needs to happen at a specific point in the lifecycle (after some phases, before others).

  • It has its own error handling, its own retry / backoff, its own “skip cleanly” semantic.

  • The output goes into its own set of tables (or its own columns) that don’t overlap with existing phases.

Extend an existing phase when:

  • You’re adding a column to data the existing phase already collects (a new field on repo_info, a new attribute on issues).

  • You’re tweaking the SQL or the API call shape.

  • The work happens in the same logical step.

Example of correct phase addition: the v0.21.0 scancode worker pool. It’s separate from analysis because it has its own cadence (180-day default vs every-cycle), its own concurrency (decoupled pool vs inline), and its own recovery semantics (PID-based orphan recovery).

Example of correct extension: v0.23.0’s repo-metadata backfill (description, primary_language, languages). It augments FetchRepoInfo rather than creating a new phase, because the data lives in repo_info (already populated by Phase 0).

Patterns by phase type

“Per-repo, inline, runs every cycle”

Goes inside the staged pipeline. Add a method to StagedCollector that takes the staging writer, owner, repo, since timestamp, and the result-tracking struct. Call it from collectSequential or collectParallel at the right phase position.

Template:

// internal/collector/staged.go — add as a new method
func (sc *StagedCollector) collectThings(ctx context.Context, sw *db.StagingWriter,
        owner, repo string, since time.Time, result *CollectResult) {

    for thing, err := range sc.client.ListThings(ctx, owner, repo, since) {
        if err != nil {
            if isOptionalEndpointSkip(err) {
                sc.logger.Info("things endpoint unavailable, skipping",
                    "owner", owner, "repo", repo, "error", err)
                return  // not fatal — platform doesn't support this
            }
            sc.logger.Warn("list things failed",
                "owner", owner, "repo", repo, "error", err)
            result.Errors = append(result.Errors, err)
            return
        }
        if err := sw.Stage(ctx, db.EntityThing, db.StagedRow{
            RepoID: result.RepoID,
            Data:   mustMarshal(thing),
        }); err != nil {
            sc.logger.Warn("stage thing failed",
                "owner", owner, "repo", repo, "error", err)
            result.Errors = append(result.Errors, err)
            return
        }
        result.ThingsCount++
    }
}

Call from collectSequential:

sc.collectThings(ctx, sw, owner, repo, since, result)

The staging writer batches inserts; you don’t need to flush yourself. The downstream Processor.ProcessRepo handles relational writes.

If the phase produces a NEW entity type, you’ll need to:

  1. Add db.EntityThing constant.

  2. Add a corresponding row processor in internal/db/processor.go that reads staging rows of that type and INSERTs into the relational table.

  3. Add the relational table to schema.sql (see schema-migrations.md).

“Per-repo, runs after facade (needs git clone)”

Goes in internal/collector/analysis.go or alongside it as a sibling file. Pattern:

// internal/collector/my_analysis.go
type MyAnalyzer struct {
    store  *db.PostgresStore
    logger *slog.Logger
}

func NewMyAnalyzer(store *db.PostgresStore, logger *slog.Logger) *MyAnalyzer {
    return &MyAnalyzer{store: store, logger: logger}
}

func (a *MyAnalyzer) AnalyzeRepo(ctx context.Context, repoID int64, repo *model.Repo, cloneDir string) error {
    // cloneDir is the path to the bare clone the facade phase produced.
    // For full-clone analysis, you'll need to do your own `git clone` of cloneDir
    // into a temp directory.

    // Do the analysis...
    result := /* ... */

    // Write to the database.
    return a.store.SaveMyAnalysis(ctx, repoID, result)
}

Wire in scheduler.go’s runFacadeAndAnalysis after the existing analysis call.

“Background, runs on a periodic ticker, fleet-wide”

Like the v0.18.29 runEnrichment or v0.19.2 runSearchResolve or v0.22.4 long-jobs watchdog. These don’t have a per-repo phase position — they run on their own schedule and process whatever rows match a query.

Pattern:

// internal/scheduler/scheduler.go — add the field + ticker

type Scheduler struct {
    // ...
    myThingInterval time.Duration
}

// In Run(), inside the select loop:
case <-myThingTicker.C:
    s.runMyThing(ctx)
// internal/scheduler/my_thing.go — the actual function
func (s *Scheduler) runMyThing(ctx context.Context) {
    candidates, err := s.store.GetThingsNeedingMyThing(ctx, batchSize)
    if err != nil {
        s.logger.Warn("get my-thing candidates failed", "error", err)
        return
    }
    for _, c := range candidates {
        if err := s.doMyThing(ctx, c); err != nil {
            s.logger.Warn("my-thing failed",
                "id", c.ID, "error", err)
            // Stamp last_attempted_at unconditionally so the cooldown gate works
            // (the v0.18.29 / v0.19.2 / v0.20.17 pattern — see CLAUDE.md).
            _ = s.store.MarkThingAttempted(ctx, c.ID)
            continue
        }
        _ = s.store.MarkThingDone(ctx, c.ID)
    }
}

The cooldown discipline (mandatory): if your background task has the failure pattern “I tried X, it failed, I’ll try again next cycle, it fails again, repeat forever,” add a last_attempted_at TIMESTAMPTZ column AND a query gate WHERE last_attempted_at IS NULL OR last_attempted_at < NOW() - cooldown. Stamp the column on EVERY attempt — success, failure, even network errors. v0.18.29, v0.19.2, and v0.20.17 are all this pattern. CLAUDE.md has the full rationale; don’t reinvent it.

Add config knobs:

// internal/config/config.go — CollectionConfig
MyThingIntervalMinutes int `json:"my_thing_interval_minutes"`
MyThingBatchSize       int `json:"my_thing_batch_size"`
MyThingCooldownDays    int `json:"my_thing_cooldown_days"`

Wire through scheduler.Config and main.go. The existing TestConfigurationDocsCoverEveryJSONField tripwire will fire CI if you forget to document them in docs/getting-started/configuration.md.

“Decoupled worker pool, not in the main pipeline”

Like scancode (v0.21.0). The pattern:

  • Dedicated package: internal/collector/my_worker.go and friends.

  • Own claim/release lifecycle via aveloxis_data.repos columns (my_locked_at, my_locked_pid, etc.).

  • Crash recovery via boot_id + PID.

  • Cadence gate: a my_last_run TIMESTAMPTZ column queried with IS NULL OR < NOW() - cadence.

  • Configurable workers + interval + cadence + shutdown grace.

This is heavy infrastructure. Don’t reach for it unless you have evidence the work is the wrong shape for the inline pipeline — typically because it’s slow enough (multiple minutes per repo) that it would starve other phases. See docs/architecture/scancode.md for the full template.

The staging writer contract

If your phase produces relational data, route it through db.StagingWriter. Why:

  • Atomic per-repo replacement semantics (the processor truncates+inserts per repo).

  • Batched INSERT (staging buffers 500 rows, then flushes — much faster than per-row INSERT).

  • Idempotency: re-running the phase doesn’t produce duplicates.

Pattern:

sw := db.NewStagingWriter(store, logger)
defer sw.Flush(ctx)  // CRITICAL — see v0.16.11

for thing, err := range client.ListThings(ctx, owner, repo, since) {
    // ...
    sw.Stage(ctx, db.EntityThing, db.StagedRow{...})
}
// processor.ProcessRepo(ctx, repoID) drains staging into relational tables

The v0.16.11 bug: StagingWriter.Stage buffers internally and only auto-flushes at the 500-row threshold. Four call sites in v0.16.10 forgot to call Flush at the end, dropping buffered rows. The fix added explicit Flush calls. The pattern test TestStagingWriterCallersFlushBeforeProcess enforces it.

Always call Flush before processor.ProcessRepo. No exceptions.

Error handling discipline

Phases must distinguish:

  • Skippable: 404, 403, 410, 204, NoContent. The endpoint doesn’t apply to this repo (issues disabled, permissions, etc.). Use isOptionalEndpointSkip(err) and return early WITHOUT appending to result.Errors.

  • Transient: 502/503/504, retry-exhaustion, GraphQL transient. Already retried by the HTTPClient layer; if it bubbles out, treat as transient — append to errors so buildOutcome can decide whether to force-full-recollect on the next cycle.

  • Fatal: schema errors, auth errors, programmer errors. Bubble up; the scheduler logs ERROR and the job fails.

The platform.ClassifyError(err) ErrorClass helper handles the routing. Use it.

Testing phases

Unit tests use a mock platform.Client + an in-memory or scratch PostgresStore. The harness pattern:

func TestCollectThings(t *testing.T) {
    ctx := context.Background()
    mockClient := &mockPlatformClient{
        things: []model.Thing{{ID: 1, Name: "alpha"}, {ID: 2, Name: "beta"}},
    }
    sc := NewStagedCollectorWithClient(mockClient, store, logger)
    result := &CollectResult{RepoID: 42}
    sw := db.NewStagingWriter(store, logger)

    sc.collectThings(ctx, sw, "owner", "repo", time.Time{}, result)

    sw.Flush(ctx)
    if result.ThingsCount != 2 {
        t.Errorf("expected 2 things, got %d", result.ThingsCount)
    }
}

For integration tests against a real PostgreSQL via AVELOXIS_TEST_DB, see testing.md.

Order-of-operations checklist

When adding a phase:

  1. Decide the phase type (per-repo inline, background ticker, decoupled worker).

  2. Decide the data model — new entity type, new columns on existing, new table?

  3. Write the platform.Client method if the data comes from an external API (or extend existing).

  4. Write the failing test.

  5. Implement the phase function following the relevant template above.

  6. Wire into the scheduler / staged collector at the right phase position.

  7. Add config knobs with the Interval / BatchSize / Cooldown pattern if it’s a background task.

  8. Test, document, version-bump, CLAUDE.md entry.

A real example to study

Read internal/scheduler/repo_metadata_backfill.go (~140 lines) — the v0.23.0 background phase that backfills repo_description and primary_language on existing rows. It’s the cleanest minimal example of the periodic-ticker pattern: config knob, candidate query, per-row processing with continue-on-error, deterministic exit when the candidate set is exhausted.

Then read internal/collector/scancode_worker.go (~800 lines) — the v0.21.0 decoupled-worker pattern. Heavier, but it’s the canonical example of a phase that needs its own lifecycle.

Between them, you’ve seen both ends of the phase-design spectrum.