Background processing

Job queue

The pkg/queue package provides a durable, SQLite-backed job queue with in-process workers. Jobs survive process restarts because they're stored in the database.

import "github.com/stanza-go/framework/pkg/queue"

Creating a queue

q := queue.New(db,
    queue.WithWorkers(2),                      // concurrent workers (default: 1)
    queue.WithPollInterval(1 * time.Second),   // how often workers check for jobs (default: 1s)
    queue.WithMaxAttempts(3),                   // default retry count (default: 3)
    queue.WithRetryDelay(30 * time.Second),    // base delay between retries (default: 30s)
    queue.WithLogger(logger),
)

Registering handlers

Register handlers for each job type before starting the queue:

q.Register("send-email", func(ctx context.Context, payload []byte) error {
    var email struct {
        To      string `json:"to"`
        Subject string `json:"subject"`
        Body    string `json:"body"`
    }
    json.Unmarshal(payload, &email)
    return sendEmail(ctx, email.To, email.Subject, email.Body)
})

q.Register("generate-report", func(ctx context.Context, payload []byte) error {
    return generateReport(ctx, payload)
})

The context is cancelled when the queue is stopping, allowing handlers to clean up.


Enqueuing jobs

payload, _ := json.Marshal(map[string]string{
    "to":      "user@example.com",
    "subject": "Welcome!",
    "body":    "Thanks for signing up.",
})

// Enqueue immediately
jobID, err := q.Enqueue(ctx, "send-email", payload)

// Enqueue with delay
jobID, err := q.Enqueue(ctx, "send-email", payload,
    queue.Delay(5 * time.Minute),
)

// Enqueue with custom max attempts
jobID, err := q.Enqueue(ctx, "send-email", payload,
    queue.MaxAttempts(5),
)

// Enqueue on a specific named queue
jobID, err := q.Enqueue(ctx, "generate-report", payload,
    queue.OnQueue("reports"),
)

Job lifecycle

pending → running → completed
                  → failed (retries remaining → pending)
                  → failed (no retries → dead)
pending → cancelled
StatusMeaning
pendingWaiting to be picked up by a worker
runningCurrently being processed
completedFinished successfully
failedHandler returned an error
deadExhausted all retry attempts
cancelledCancelled before execution

Failed jobs are automatically retried with linear backoff. After all attempts are exhausted, the job moves to dead.


Lifecycle integration

func provideQueue(lc *lifecycle.Lifecycle, db *sqlite.DB, logger *log.Logger) *queue.Queue {
    q := queue.New(db, queue.WithWorkers(2), queue.WithLogger(logger))

    q.Register("send-email", handleSendEmail)
    q.Register("generate-report", handleReport)

    lc.Append(lifecycle.Hook{
        OnStart: q.Start,
        OnStop:  q.Stop,
    })

    return q
}

Start creates the jobs table if needed and launches worker goroutines. Stop signals workers to finish, waits for in-flight jobs, and respects the context deadline.


Monitoring

Stats

stats, err := q.Stats()
fmt.Printf("pending=%d running=%d completed=%d failed=%d dead=%d\n",
    stats.Pending, stats.Running, stats.Completed, stats.Failed, stats.Dead)

List jobs

jobs, err := q.Jobs(queue.Filter{
    Status: queue.StatusFailed,
    Limit:  20,
    Offset: 0,
})

for _, j := range jobs {
    fmt.Printf("[%d] %s — %s (attempts: %d/%d)\n",
        j.ID, j.Type, j.Status, j.Attempts, j.MaxAttempts)
}

Filter by queue name, job type, or status. Default limit is 50.

Count jobs

JobCount returns the total number of jobs matching a filter, ignoring Limit and Offset. Useful for pagination totals:

total, err := q.JobCount(queue.Filter{
    Status: queue.StatusFailed,
})

Get a single job

job, err := q.Job(42)
fmt.Printf("type=%s status=%s error=%s\n", job.Type, job.Status, job.LastError)

Management

Retry a failed job

err := q.Retry(42)
// Resets status to pending, increments max_attempts by 1

Works on both failed and dead jobs.

Cancel a pending job

err := q.Cancel(42)
// Only works on pending jobs

Purge old jobs

deleted, err := q.Purge(24 * time.Hour)
// Deletes completed and cancelled jobs older than 24 hours

Typically called from a cron job to keep the table clean.

Previous
Cron scheduler