Files
bssapp/svc/queries/product_pricing_recalc_queue.go
2026-06-17 21:57:02 +03:00

175 lines
4.2 KiB
Go

package queries
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/lib/pq"
)
// EnqueuePriceRecalc enqueues product codes for delta FX publish.
// It is safe to call repeatedly; duplicates in pending/processing are ignored.
func EnqueuePriceRecalc(ctx context.Context, tx *sql.Tx, productCodes []string, reason string) (int, error) {
if len(productCodes) == 0 {
return 0, nil
}
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "manual"
}
seen := map[string]struct{}{}
inserted := 0
for _, raw := range productCodes {
code := strings.TrimSpace(raw)
if code == "" {
continue
}
if _, ok := seen[code]; ok {
continue
}
seen[code] = struct{}{}
_, err := tx.ExecContext(ctx, `
INSERT INTO mk_price_recalc_queue (
product_code, pricing_parameter_id, reason, status, attempts,
available_at, queued_at, processed_at, last_error,
created_at, updated_at
)
VALUES ($1, NULL, $2, 'pending', 0, now(), now(), NULL, '', now(), now())
`, code, reason)
if err != nil {
if pe, ok := err.(*pq.Error); ok && pe != nil && string(pe.Code) == "23505" {
// Duplicate in pending/processing (partial unique index).
continue
}
return inserted, err
}
inserted++
}
return inserted, nil
}
type PriceRecalcQueueItem struct {
ID int64
ProductCode string
Attempts int
}
// ClaimPriceRecalcQueue claims up to limit pending items for processing (SKIP LOCKED).
func ClaimPriceRecalcQueue(ctx context.Context, tx *sql.Tx, limit int) ([]PriceRecalcQueueItem, error) {
if limit <= 0 {
limit = 100
}
rows, err := tx.QueryContext(ctx, `
WITH picked AS (
SELECT id
FROM mk_price_recalc_queue
WHERE status = 'pending'
AND available_at <= now()
ORDER BY queued_at
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE mk_price_recalc_queue q
SET status = 'processing', updated_at = now()
FROM picked
WHERE q.id = picked.id
RETURNING q.id, q.product_code, q.attempts;
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]PriceRecalcQueueItem, 0, limit)
for rows.Next() {
var it PriceRecalcQueueItem
if err := rows.Scan(&it.ID, &it.ProductCode, &it.Attempts); err != nil {
return nil, err
}
it.ProductCode = strings.TrimSpace(it.ProductCode)
out = append(out, it)
}
return out, rows.Err()
}
func MarkPriceRecalcQueueDone(ctx context.Context, tx *sql.Tx, id int64) error {
_, err := tx.ExecContext(ctx, `
UPDATE mk_price_recalc_queue
SET status='done',
processed_at = now(),
updated_at = now(),
last_error=''
WHERE id=$1;
`, id)
return err
}
func MarkPriceRecalcQueueFailed(ctx context.Context, tx *sql.Tx, id int64, attempts int, errText string) error {
errText = strings.TrimSpace(errText)
if len(errText) > 900 {
errText = errText[:900]
}
// Exponential-ish backoff: 5m, 15m, 60m.
delay := 5 * time.Minute
if attempts >= 1 {
delay = 15 * time.Minute
}
if attempts >= 2 {
delay = 60 * time.Minute
}
_, err := tx.ExecContext(ctx, `
UPDATE mk_price_recalc_queue
SET status='failed',
attempts = attempts + 1,
processed_at = now(),
updated_at = now(),
last_error=$2,
available_at = now() + $3::interval
WHERE id=$1;
`, id, errText, fmt.Sprintf("%d seconds", int(delay.Seconds())))
return err
}
// MarkPriceRecalcQueueDoneByProductCodes marks pending/processing rows as done for given product codes.
// This is useful when an immediate publish path completes successfully and we want to avoid a second run.
func MarkPriceRecalcQueueDoneByProductCodes(ctx context.Context, tx *sql.Tx, productCodes []string) (int64, error) {
if len(productCodes) == 0 {
return 0, nil
}
clean := make([]string, 0, len(productCodes))
seen := map[string]struct{}{}
for _, raw := range productCodes {
code := strings.TrimSpace(raw)
if code == "" {
continue
}
if _, ok := seen[code]; ok {
continue
}
seen[code] = struct{}{}
clean = append(clean, code)
}
if len(clean) == 0 {
return 0, nil
}
res, err := tx.ExecContext(ctx, `
UPDATE mk_price_recalc_queue
SET status='done',
processed_at = now(),
updated_at = now(),
last_error=''
WHERE product_code = ANY($1)
AND status IN ('pending','processing');
`, pq.Array(clean))
if err != nil {
return 0, err
}
ra, _ := res.RowsAffected()
return ra, nil
}