feat(worker): worker pool + DB-backed task queue + retry (bookshelf-w5y.1) #22
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "bd-bookshelf-w5y.1"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Summary
internal/workerpackage: single-processPoolwithRegisterHandler,Start,Shutdown, panic recovery (terminal, no retry), wall-clock budget (1h fromcreated_at), in-process attempt counter (MaxAttempts=3), exponential backoff (0s/30s/5m), andUpdateProgresspassed to handlers as a func arg.tasks.sql) and hand-written helpers (tasks_extra.go):ClaimNextTask(dynamic IN clause + CAS for future cluster-safety),CASClaimTask,EnqueueTask,UpdateTaskProgress,MarkTaskCompleted,MarkTaskFailed,RequeueForRetry.tasks_claimed_total{type},tasks_processed_total{type,outcome},task_duration_seconds{type},tasks_in_flight.HA gap: The attempt counter is in-process only. A mid-retry crash resets the counter. The wall-clock budget (1h) acts as the compensating control — see
internal/worker/doc.go.Test plan
make testpasses locally (race detector enabled)make coveragepasses at 100% gatemake lintpasses (go vet + golangci-lint, 0 issues)Shutdowndrain verified (returns when context deadline hits even with in-flight handler)Closes bead bookshelf-w5y.1 on merge.
🤖 Generated with Claude Code
Security Review
Threat checks
Findings
1. SQL injection — CLEAN
tasks_extra.gobuilds the dynamicIN (?, ?, ...)clause by generating one"?"placeholder per element and appending eachtypes[i]value to the args slice. No user-controlled string is ever interpolated into the SQL text. All other queries are sqlc-generated with?placeholders throughout. Clean.2. task_options deserialization — CLEAN
task_optionsis stored astextand surfaced to handlers as a rawstring(Task.TaskOptions). There is no JSON deserialization at the framework layer. Handlers own parsing. No deserialization exploit surface in this PR.3. Status state machine — CLEAN
Every terminal transition (
MarkTaskCompleted,MarkTaskFailed) setscompleted_at = NOW()atomically in the same SQL UPDATE. The "budget expired before dispatch" path goes throughmarkFailedwhich also setscompleted_at. No path reaches acompleted/failedstatus with a NULLcompleted_at.4. Logging hygiene — CLEAN
task_optionsis never written to any log field inpool.go. Log calls only emittask_id,task_type,attempt,error, andprogress_pct. The fullTaskstruct (which carriesTaskOptions) is never passed to a logger. Clean per logging-standard.5. Race/duplicate execution — CLEAN
FOR UPDATE SKIP LOCKEDacquires an advisory row lock, preventing two workers from seeing the same row in the same transaction. After the lock is released,CASClaimTaskdoesUPDATE ... WHERE id = ? AND status = 'queued'— only the first worker to execute this wins; the second getsRowsAffected = 0and aborts without dispatching. Two-phase claim is correct.6. Wall-clock budget — NOTE (not a must-fix, but needs a doc line)
The budget is enforced via
context.WithDeadline. A handler that uses ctx-aware calls will be cancelled correctly. A handler that blocks on ctx-unaware I/O (e.g., a puretime.Sleep, a blocking syscall without ctx) will silently exceed the budget. The HA gap around the attempt counter is documented indoc.go, but this ctx-awareness requirement is not. Recommend adding one sentence todoc.goor theHandlertype godoc: "Handlers must respect ctx cancellation; blocking I/O that ignores ctx can exceed the wall-clock budget."7. Panic recovery / log injection — LOW RISK NOTE
When a handler panics, the pool logs
fmt.Sprintf("%v", r)as the"panic"slog field andstring(debug.Stack())as"stack". Inslog.NewJSONHandlermode both values are JSON-escaped and safe for log parsing. Inslog.NewTextHandlermode (used in tests withio.Discard) the values are unescaped but still structured key-value pairs, so an adversary would need DB write access and a crafted panic value containing slog text-format control sequences (e.g., embeddedkey=valuepairs with newlines). This is a theoretical concern for an internal worker with controlled task types; low risk. No action required unless the app switches to text-format logging in production.8. Prometheus label cardinality — NOTE
claimedTotal.WithLabelValues(row.Type)anddurationSeconds.WithLabelValues(row.Type)use the tasktypefield read from the DB row, not the set of registered handler keys. An operator or test helper with DB write access couldINSERT INTO tasks (type='arbitrary-string')and each unique type value would create a new Prometheus time series. Currently task types are application-controlled (enqueued by the app itself), so the cardinality is bounded in practice. However, if task enqueueing is ever exposed to external input (API, webhook), this becomes a cardinality-bomb risk. Recommend a guard: before recording the metric, check_, ok := p.handlers[row.Type]and fall back to"unknown"for unregistered types (which already get fast-failed anyway).9. Makefile — CLEAN
The change from
-p 8to-p 1serialises test package execution to prevent migration-lock contention among integration suites sharing a MySQL container.-raceis retained. No security boundary change.10. app_suite_test.go — CLEAN
The change pins
TRUNCATEandSET FOREIGN_KEY_CHECKSstatements to a single*sql.Connto ensure the session variable is scoped correctly. No credentials are added or leaked; the DSN is sourced from the existingBOOKSHELF_DSNenv var or a testcontainer.Verdict
⚠️ Notes
Two actionable notes (neither is a blocker for merge):
doc.goor theHandlergodoc documenting that handlers must honour ctx cancellation for the wall-clock budget to be enforced.typelabel to registered handler types (use"unknown"for anything else) to guard against future cardinality explosion if task enqueueing becomes user-facing.Code Review
Phase 0 - DEMO Verification
No standalone DEMO block was provided in the bead comments or PR body. The bead description specifies acceptance criteria that map directly to the integration tests (success path, retry-to-fail, panic recovery against real MySQL). CI is green and covers all three paths via
internal/worker/integration_test.go. Accepting CI as the DEMO signal per review instructions; noting the absence of a separate DEMO comment block.Spec Compliance
handleFailureatpool.go:310created_at);context.WithDeadlineatpool.go:240; pre-dispatch expiry guard atpool.go:233defer recover()atpool.go:265-280; panics go terminal toMarkTaskFailed, no retrytasks_claimed_total,tasks_processed_total,task_duration_seconds,tasks_in_flightCross-Cutting Concerns
1. Makefile -p 1 change - JUSTIFIED, minor concern remains
2. truncateTables pinned-conn fix - CORRECT
app_suite_test.go:88-116acquires*sql.Connviapool.Conn(ctx)and runsSET FOREIGN_KEY_CHECKS=0, allTRUNCATE TABLEstatements, andSET FOREIGN_KEY_CHECKS=1on that same pinned connection. The original*sql.DBpath could route the session variable and the truncate onto different physical connections. Fix is correct.3. ClaimNextTask race-safety - CORRECT (reasoning inline)
FOR UPDATE SKIP LOCKEDpresent in generatedClaimNextTaskByType(tasks.sql.go:22) and hand-writtenClaimNextTask(tasks_extra.go:43).updated_at <= NOW()filter present in both paths; backoff enforced.FOR UPDATE SKIP LOCKEDonly holds the lock for the SELECT duration; by the timeCASClaimTaskruns the lock is released. Two workers can race, but the false-return path inrunOne(pool.go:210-213) drops the task without dispatching it. The CAS is the actual double-dispatch guard. This is correct at-least-once semantics.4. Retry backoff - CORRECT
RequeueForRetrysetsupdated_at = ?(future timestamp);tasks.sql:47.ClaimNextTaskfiltersAND updated_at <= NOW();tasks_extra.go:42.retryAt = now + backoff - 1satpool.go:330; the -1s fudge handles zero-backoff + clock-skew correctly.5. Wall-clock budget - CORRECT
pool.go:233-237.context.WithDeadline(ctx, row.CreatedAt.Add(TaskBudget))atpool.go:240.context.DeadlineExceededtriggers terminal fail:pool.go:298-303.6. Panic recovery - CORRECT
defer recover()inside anonymous closure atpool.go:265;handlerErrset,didPanic = true.didPanicbranch atpool.go:291-295:failTaskcalled (MarkTaskFailed + metric), attempts cleared. No retry.debug.Stack()atpool.go:272.7. Project Conventions
Config, bound once inNewPool.Fnsuffix on any dep field or exported name.var-at-top consistently applied inpool.goandtasks_extra.go.success pathDescribe does setup + pool start entirely inBeforeEachwith an emptyJustBeforeEachno-op (pool_test.go:232-235). Pragmatic adaptation for async goroutine-driven code; not blocking.8. Test depth - VERIFIED
pool_test.go:357+ integrationintegration_test.go:170Shutdown drainatpool_test.go:772pool_test.go:307+ integrationintegration_test.go:159pool_test.go:440pool_test.go:479pool_test.go:554pool_test.go:597pool_test.go:987-1099Findings
Must-fix: none.
Important: none.
Minor (do not block):
pool_test.go:178-Expect(true).To(BeTrue())is a liveness probe with no explanatory comment. A comment like// reached without deadlock or panicwould clarify intent.Makefile-p 1drops the-p 8parallelism from bookshelf-rna. A follow-up bead to share a single DB container across suites viaBOOKSHELF_DSNwould restore CI speed.Describeblocks inpool_test.gorebuild the fullworker.Configinline rather than usingnewPool(). Necessary because claim/CAS behaviour varies per test, but the repetition is noticeable.Verdict
Ready to merge - all spec requirements met, concurrency and race-safety are correct, test coverage is comprehensive, project conventions are followed. Minor nits only.
c1217295bcf9475bc969f9475bc9697913e7f157