Merge remote-tracking branch 'origin/master'
This commit is contained in:
174
svc/queries/product_pricing_recalc_queue.go
Normal file
174
svc/queries/product_pricing_recalc_queue.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package queries
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/lib/pq"
|
||||
)
|
||||
|
||||
// EnqueuePriceRecalc enqueues product codes for delta FX publish.
|
||||
// It is safe to call repeatedly; duplicates in pending/processing are ignored.
|
||||
func EnqueuePriceRecalc(ctx context.Context, tx *sql.Tx, productCodes []string, reason string) (int, error) {
|
||||
if len(productCodes) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
reason = strings.TrimSpace(reason)
|
||||
if reason == "" {
|
||||
reason = "manual"
|
||||
}
|
||||
|
||||
seen := map[string]struct{}{}
|
||||
inserted := 0
|
||||
for _, raw := range productCodes {
|
||||
code := strings.TrimSpace(raw)
|
||||
if code == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[code]; ok {
|
||||
continue
|
||||
}
|
||||
seen[code] = struct{}{}
|
||||
|
||||
_, err := tx.ExecContext(ctx, `
|
||||
INSERT INTO mk_price_recalc_queue (
|
||||
product_code, pricing_parameter_id, reason, status, attempts,
|
||||
available_at, queued_at, processed_at, last_error,
|
||||
created_at, updated_at
|
||||
)
|
||||
VALUES ($1, NULL, $2, 'pending', 0, now(), now(), NULL, '', now(), now())
|
||||
`, code, reason)
|
||||
if err != nil {
|
||||
if pe, ok := err.(*pq.Error); ok && pe != nil && string(pe.Code) == "23505" {
|
||||
// Duplicate in pending/processing (partial unique index).
|
||||
continue
|
||||
}
|
||||
return inserted, err
|
||||
}
|
||||
inserted++
|
||||
}
|
||||
return inserted, nil
|
||||
}
|
||||
|
||||
type PriceRecalcQueueItem struct {
|
||||
ID int64
|
||||
ProductCode string
|
||||
Attempts int
|
||||
}
|
||||
|
||||
// ClaimPriceRecalcQueue claims up to limit pending items for processing (SKIP LOCKED).
|
||||
func ClaimPriceRecalcQueue(ctx context.Context, tx *sql.Tx, limit int) ([]PriceRecalcQueueItem, error) {
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
rows, err := tx.QueryContext(ctx, `
|
||||
WITH picked AS (
|
||||
SELECT id
|
||||
FROM mk_price_recalc_queue
|
||||
WHERE status = 'pending'
|
||||
AND available_at <= now()
|
||||
ORDER BY queued_at
|
||||
LIMIT $1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
UPDATE mk_price_recalc_queue q
|
||||
SET status = 'processing', updated_at = now()
|
||||
FROM picked
|
||||
WHERE q.id = picked.id
|
||||
RETURNING q.id, q.product_code, q.attempts;
|
||||
`, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make([]PriceRecalcQueueItem, 0, limit)
|
||||
for rows.Next() {
|
||||
var it PriceRecalcQueueItem
|
||||
if err := rows.Scan(&it.ID, &it.ProductCode, &it.Attempts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
it.ProductCode = strings.TrimSpace(it.ProductCode)
|
||||
out = append(out, it)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func MarkPriceRecalcQueueDone(ctx context.Context, tx *sql.Tx, id int64) error {
|
||||
_, err := tx.ExecContext(ctx, `
|
||||
UPDATE mk_price_recalc_queue
|
||||
SET status='done',
|
||||
processed_at = now(),
|
||||
updated_at = now(),
|
||||
last_error=''
|
||||
WHERE id=$1;
|
||||
`, id)
|
||||
return err
|
||||
}
|
||||
|
||||
func MarkPriceRecalcQueueFailed(ctx context.Context, tx *sql.Tx, id int64, attempts int, errText string) error {
|
||||
errText = strings.TrimSpace(errText)
|
||||
if len(errText) > 900 {
|
||||
errText = errText[:900]
|
||||
}
|
||||
// Exponential-ish backoff: 5m, 15m, 60m.
|
||||
delay := 5 * time.Minute
|
||||
if attempts >= 1 {
|
||||
delay = 15 * time.Minute
|
||||
}
|
||||
if attempts >= 2 {
|
||||
delay = 60 * time.Minute
|
||||
}
|
||||
_, err := tx.ExecContext(ctx, `
|
||||
UPDATE mk_price_recalc_queue
|
||||
SET status='failed',
|
||||
attempts = attempts + 1,
|
||||
processed_at = now(),
|
||||
updated_at = now(),
|
||||
last_error=$2,
|
||||
available_at = now() + $3::interval
|
||||
WHERE id=$1;
|
||||
`, id, errText, fmt.Sprintf("%d seconds", int(delay.Seconds())))
|
||||
return err
|
||||
}
|
||||
|
||||
// MarkPriceRecalcQueueDoneByProductCodes marks pending/processing rows as done for given product codes.
|
||||
// This is useful when an immediate publish path completes successfully and we want to avoid a second run.
|
||||
func MarkPriceRecalcQueueDoneByProductCodes(ctx context.Context, tx *sql.Tx, productCodes []string) (int64, error) {
|
||||
if len(productCodes) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
clean := make([]string, 0, len(productCodes))
|
||||
seen := map[string]struct{}{}
|
||||
for _, raw := range productCodes {
|
||||
code := strings.TrimSpace(raw)
|
||||
if code == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[code]; ok {
|
||||
continue
|
||||
}
|
||||
seen[code] = struct{}{}
|
||||
clean = append(clean, code)
|
||||
}
|
||||
if len(clean) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
res, err := tx.ExecContext(ctx, `
|
||||
UPDATE mk_price_recalc_queue
|
||||
SET status='done',
|
||||
processed_at = now(),
|
||||
updated_at = now(),
|
||||
last_error=''
|
||||
WHERE product_code = ANY($1)
|
||||
AND status IN ('pending','processing');
|
||||
`, pq.Array(clean))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
ra, _ := res.RowsAffected()
|
||||
return ra, nil
|
||||
}
|
||||
Reference in New Issue
Block a user