package queries import ( "context" "database/sql" "fmt" "strings" "time" "github.com/lib/pq" ) // EnqueuePriceRecalc enqueues product codes for delta FX publish. // It is safe to call repeatedly; duplicates in pending/processing are ignored. func EnqueuePriceRecalc(ctx context.Context, tx *sql.Tx, productCodes []string, reason string) (int, error) { if len(productCodes) == 0 { return 0, nil } reason = strings.TrimSpace(reason) if reason == "" { reason = "manual" } seen := map[string]struct{}{} inserted := 0 for _, raw := range productCodes { code := strings.TrimSpace(raw) if code == "" { continue } if _, ok := seen[code]; ok { continue } seen[code] = struct{}{} _, err := tx.ExecContext(ctx, ` INSERT INTO mk_price_recalc_queue ( product_code, pricing_parameter_id, reason, status, attempts, available_at, queued_at, processed_at, last_error, created_at, updated_at ) VALUES ($1, NULL, $2, 'pending', 0, now(), now(), NULL, '', now(), now()) `, code, reason) if err != nil { if pe, ok := err.(*pq.Error); ok && pe != nil && string(pe.Code) == "23505" { // Duplicate in pending/processing (partial unique index). continue } return inserted, err } inserted++ } return inserted, nil } type PriceRecalcQueueItem struct { ID int64 ProductCode string Attempts int } // ClaimPriceRecalcQueue claims up to limit pending items for processing (SKIP LOCKED). func ClaimPriceRecalcQueue(ctx context.Context, tx *sql.Tx, limit int) ([]PriceRecalcQueueItem, error) { if limit <= 0 { limit = 100 } rows, err := tx.QueryContext(ctx, ` WITH picked AS ( SELECT id FROM mk_price_recalc_queue WHERE status = 'pending' AND available_at <= now() ORDER BY queued_at LIMIT $1 FOR UPDATE SKIP LOCKED ) UPDATE mk_price_recalc_queue q SET status = 'processing', updated_at = now() FROM picked WHERE q.id = picked.id RETURNING q.id, q.product_code, q.attempts; `, limit) if err != nil { return nil, err } defer rows.Close() out := make([]PriceRecalcQueueItem, 0, limit) for rows.Next() { var it PriceRecalcQueueItem if err := rows.Scan(&it.ID, &it.ProductCode, &it.Attempts); err != nil { return nil, err } it.ProductCode = strings.TrimSpace(it.ProductCode) out = append(out, it) } return out, rows.Err() } func MarkPriceRecalcQueueDone(ctx context.Context, tx *sql.Tx, id int64) error { _, err := tx.ExecContext(ctx, ` UPDATE mk_price_recalc_queue SET status='done', processed_at = now(), updated_at = now(), last_error='' WHERE id=$1; `, id) return err } func MarkPriceRecalcQueueFailed(ctx context.Context, tx *sql.Tx, id int64, attempts int, errText string) error { errText = strings.TrimSpace(errText) if len(errText) > 900 { errText = errText[:900] } // Exponential-ish backoff: 5m, 15m, 60m. delay := 5 * time.Minute if attempts >= 1 { delay = 15 * time.Minute } if attempts >= 2 { delay = 60 * time.Minute } _, err := tx.ExecContext(ctx, ` UPDATE mk_price_recalc_queue SET status='failed', attempts = attempts + 1, processed_at = now(), updated_at = now(), last_error=$2, available_at = now() + $3::interval WHERE id=$1; `, id, errText, fmt.Sprintf("%d seconds", int(delay.Seconds()))) return err } // MarkPriceRecalcQueueDoneByProductCodes marks pending/processing rows as done for given product codes. // This is useful when an immediate publish path completes successfully and we want to avoid a second run. func MarkPriceRecalcQueueDoneByProductCodes(ctx context.Context, tx *sql.Tx, productCodes []string) (int64, error) { if len(productCodes) == 0 { return 0, nil } clean := make([]string, 0, len(productCodes)) seen := map[string]struct{}{} for _, raw := range productCodes { code := strings.TrimSpace(raw) if code == "" { continue } if _, ok := seen[code]; ok { continue } seen[code] = struct{}{} clean = append(clean, code) } if len(clean) == 0 { return 0, nil } res, err := tx.ExecContext(ctx, ` UPDATE mk_price_recalc_queue SET status='done', processed_at = now(), updated_at = now(), last_error='' WHERE product_code = ANY($1) AND status IN ('pending','processing'); `, pq.Array(clean)) if err != nil { return 0, err } ra, _ := res.RowsAffected() return ra, nil }