175 lines
4.2 KiB
Go
175 lines
4.2 KiB
Go
package queries
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
// EnqueuePriceRecalc enqueues product codes for delta FX publish.
|
|
// It is safe to call repeatedly; duplicates in pending/processing are ignored.
|
|
func EnqueuePriceRecalc(ctx context.Context, tx *sql.Tx, productCodes []string, reason string) (int, error) {
|
|
if len(productCodes) == 0 {
|
|
return 0, nil
|
|
}
|
|
reason = strings.TrimSpace(reason)
|
|
if reason == "" {
|
|
reason = "manual"
|
|
}
|
|
|
|
seen := map[string]struct{}{}
|
|
inserted := 0
|
|
for _, raw := range productCodes {
|
|
code := strings.TrimSpace(raw)
|
|
if code == "" {
|
|
continue
|
|
}
|
|
if _, ok := seen[code]; ok {
|
|
continue
|
|
}
|
|
seen[code] = struct{}{}
|
|
|
|
_, err := tx.ExecContext(ctx, `
|
|
INSERT INTO mk_price_recalc_queue (
|
|
product_code, pricing_parameter_id, reason, status, attempts,
|
|
available_at, queued_at, processed_at, last_error,
|
|
created_at, updated_at
|
|
)
|
|
VALUES ($1, NULL, $2, 'pending', 0, now(), now(), NULL, '', now(), now())
|
|
`, code, reason)
|
|
if err != nil {
|
|
if pe, ok := err.(*pq.Error); ok && pe != nil && string(pe.Code) == "23505" {
|
|
// Duplicate in pending/processing (partial unique index).
|
|
continue
|
|
}
|
|
return inserted, err
|
|
}
|
|
inserted++
|
|
}
|
|
return inserted, nil
|
|
}
|
|
|
|
type PriceRecalcQueueItem struct {
|
|
ID int64
|
|
ProductCode string
|
|
Attempts int
|
|
}
|
|
|
|
// ClaimPriceRecalcQueue claims up to limit pending items for processing (SKIP LOCKED).
|
|
func ClaimPriceRecalcQueue(ctx context.Context, tx *sql.Tx, limit int) ([]PriceRecalcQueueItem, error) {
|
|
if limit <= 0 {
|
|
limit = 100
|
|
}
|
|
rows, err := tx.QueryContext(ctx, `
|
|
WITH picked AS (
|
|
SELECT id
|
|
FROM mk_price_recalc_queue
|
|
WHERE status = 'pending'
|
|
AND available_at <= now()
|
|
ORDER BY queued_at
|
|
LIMIT $1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
UPDATE mk_price_recalc_queue q
|
|
SET status = 'processing', updated_at = now()
|
|
FROM picked
|
|
WHERE q.id = picked.id
|
|
RETURNING q.id, q.product_code, q.attempts;
|
|
`, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
out := make([]PriceRecalcQueueItem, 0, limit)
|
|
for rows.Next() {
|
|
var it PriceRecalcQueueItem
|
|
if err := rows.Scan(&it.ID, &it.ProductCode, &it.Attempts); err != nil {
|
|
return nil, err
|
|
}
|
|
it.ProductCode = strings.TrimSpace(it.ProductCode)
|
|
out = append(out, it)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func MarkPriceRecalcQueueDone(ctx context.Context, tx *sql.Tx, id int64) error {
|
|
_, err := tx.ExecContext(ctx, `
|
|
UPDATE mk_price_recalc_queue
|
|
SET status='done',
|
|
processed_at = now(),
|
|
updated_at = now(),
|
|
last_error=''
|
|
WHERE id=$1;
|
|
`, id)
|
|
return err
|
|
}
|
|
|
|
func MarkPriceRecalcQueueFailed(ctx context.Context, tx *sql.Tx, id int64, attempts int, errText string) error {
|
|
errText = strings.TrimSpace(errText)
|
|
if len(errText) > 900 {
|
|
errText = errText[:900]
|
|
}
|
|
// Exponential-ish backoff: 5m, 15m, 60m.
|
|
delay := 5 * time.Minute
|
|
if attempts >= 1 {
|
|
delay = 15 * time.Minute
|
|
}
|
|
if attempts >= 2 {
|
|
delay = 60 * time.Minute
|
|
}
|
|
_, err := tx.ExecContext(ctx, `
|
|
UPDATE mk_price_recalc_queue
|
|
SET status='failed',
|
|
attempts = attempts + 1,
|
|
processed_at = now(),
|
|
updated_at = now(),
|
|
last_error=$2,
|
|
available_at = now() + $3::interval
|
|
WHERE id=$1;
|
|
`, id, errText, fmt.Sprintf("%d seconds", int(delay.Seconds())))
|
|
return err
|
|
}
|
|
|
|
// MarkPriceRecalcQueueDoneByProductCodes marks pending/processing rows as done for given product codes.
|
|
// This is useful when an immediate publish path completes successfully and we want to avoid a second run.
|
|
func MarkPriceRecalcQueueDoneByProductCodes(ctx context.Context, tx *sql.Tx, productCodes []string) (int64, error) {
|
|
if len(productCodes) == 0 {
|
|
return 0, nil
|
|
}
|
|
clean := make([]string, 0, len(productCodes))
|
|
seen := map[string]struct{}{}
|
|
for _, raw := range productCodes {
|
|
code := strings.TrimSpace(raw)
|
|
if code == "" {
|
|
continue
|
|
}
|
|
if _, ok := seen[code]; ok {
|
|
continue
|
|
}
|
|
seen[code] = struct{}{}
|
|
clean = append(clean, code)
|
|
}
|
|
if len(clean) == 0 {
|
|
return 0, nil
|
|
}
|
|
res, err := tx.ExecContext(ctx, `
|
|
UPDATE mk_price_recalc_queue
|
|
SET status='done',
|
|
processed_at = now(),
|
|
updated_at = now(),
|
|
last_error=''
|
|
WHERE product_code = ANY($1)
|
|
AND status IN ('pending','processing');
|
|
`, pq.Array(clean))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
ra, _ := res.RowsAffected()
|
|
return ra, nil
|
|
}
|