124 lines
3.1 KiB
Go
124 lines
3.1 KiB
Go
package main
|
|
|
|
import (
|
|
"bssapp-backend/db"
|
|
"bssapp-backend/queries"
|
|
"context"
|
|
"database/sql"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
func startProductPricingFxDeltaScheduler(pgDB *sql.DB) {
|
|
enabled := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_FX_DELTA_ENABLED")))
|
|
if enabled == "0" || enabled == "false" || enabled == "off" {
|
|
log.Println("Product pricing FX delta scheduler disabled")
|
|
return
|
|
}
|
|
if pgDB == nil {
|
|
return
|
|
}
|
|
|
|
intervalMin := 1
|
|
if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_FX_DELTA_INTERVAL_MIN")); raw != "" {
|
|
if parsed, err := strconv.Atoi(raw); err == nil && parsed >= 1 {
|
|
intervalMin = parsed
|
|
}
|
|
}
|
|
batchSize := 200
|
|
if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_FX_DELTA_BATCH_SIZE")); raw != "" {
|
|
if parsed, err := strconv.Atoi(raw); err == nil && parsed >= 10 && parsed <= 2000 {
|
|
batchSize = parsed
|
|
}
|
|
}
|
|
|
|
var running int32 = 0
|
|
|
|
runOnce := func(reason string) {
|
|
if db.PgDB == nil {
|
|
return
|
|
}
|
|
if !atomic.CompareAndSwapInt32(&running, 0, 1) {
|
|
log.Printf("[PricingFxDelta] skip (%s): already running", reason)
|
|
return
|
|
}
|
|
defer atomic.StoreInt32(&running, 0)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
|
|
defer cancel()
|
|
|
|
totalClaimed := 0
|
|
totalWritten := 0
|
|
for {
|
|
// Claim a batch.
|
|
tx, err := pgDB.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
log.Printf("[PricingFxDelta] begin_tx_error (%s): %v", reason, err)
|
|
return
|
|
}
|
|
items, err := queries.ClaimPriceRecalcQueue(ctx, tx, batchSize)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
log.Printf("[PricingFxDelta] claim_error (%s): %v", reason, err)
|
|
return
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
log.Printf("[PricingFxDelta] claim_commit_error (%s): %v", reason, err)
|
|
return
|
|
}
|
|
if len(items) == 0 {
|
|
break
|
|
}
|
|
|
|
totalClaimed += len(items)
|
|
codes := make([]string, 0, len(items))
|
|
for _, it := range items {
|
|
if it.ProductCode != "" {
|
|
codes = append(codes, it.ProductCode)
|
|
}
|
|
}
|
|
|
|
written, _, err := queries.PublishDerivedPricesFromAnchor(ctx, pgDB, codes, "", false)
|
|
if err != nil {
|
|
// Mark all failed.
|
|
tx2, _ := pgDB.BeginTx(ctx, nil)
|
|
if tx2 != nil {
|
|
for _, it := range items {
|
|
_ = queries.MarkPriceRecalcQueueFailed(ctx, tx2, it.ID, it.Attempts, err.Error())
|
|
}
|
|
_ = tx2.Commit()
|
|
}
|
|
log.Printf("[PricingFxDelta] publish_error (%s): claimed=%d err=%v", reason, len(items), err)
|
|
return
|
|
}
|
|
totalWritten += written
|
|
|
|
// Mark all done (even if some were skipped due to missing anchor).
|
|
tx3, _ := pgDB.BeginTx(ctx, nil)
|
|
if tx3 != nil {
|
|
for _, it := range items {
|
|
_ = queries.MarkPriceRecalcQueueDone(ctx, tx3, it.ID)
|
|
}
|
|
_ = tx3.Commit()
|
|
}
|
|
}
|
|
|
|
log.Printf("[PricingFxDelta] ok (%s): claimed=%d sdprc_written=%d interval_min=%d batch_size=%d", reason, totalClaimed, totalWritten, intervalMin, batchSize)
|
|
}
|
|
|
|
go func() {
|
|
time.Sleep(2 * time.Second)
|
|
runOnce("startup")
|
|
|
|
ticker := time.NewTicker(time.Duration(intervalMin) * time.Minute)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
runOnce("scheduled")
|
|
}
|
|
}()
|
|
}
|