package main import ( "bssapp-backend/db" "bssapp-backend/queries" "context" "database/sql" "log" "os" "strconv" "strings" "sync/atomic" "time" ) func startProductPricingFxDeltaScheduler(pgDB *sql.DB) { enabled := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_FX_DELTA_ENABLED"))) if enabled == "0" || enabled == "false" || enabled == "off" { log.Println("Product pricing FX delta scheduler disabled") return } if pgDB == nil { return } intervalMin := 1 if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_FX_DELTA_INTERVAL_MIN")); raw != "" { if parsed, err := strconv.Atoi(raw); err == nil && parsed >= 1 { intervalMin = parsed } } batchSize := 200 if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_FX_DELTA_BATCH_SIZE")); raw != "" { if parsed, err := strconv.Atoi(raw); err == nil && parsed >= 10 && parsed <= 2000 { batchSize = parsed } } var running int32 = 0 runOnce := func(reason string) { if db.PgDB == nil { return } if !atomic.CompareAndSwapInt32(&running, 0, 1) { log.Printf("[PricingFxDelta] skip (%s): already running", reason) return } defer atomic.StoreInt32(&running, 0) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) defer cancel() totalClaimed := 0 totalWritten := 0 for { // Claim a batch. tx, err := pgDB.BeginTx(ctx, nil) if err != nil { log.Printf("[PricingFxDelta] begin_tx_error (%s): %v", reason, err) return } items, err := queries.ClaimPriceRecalcQueue(ctx, tx, batchSize) if err != nil { _ = tx.Rollback() log.Printf("[PricingFxDelta] claim_error (%s): %v", reason, err) return } if err := tx.Commit(); err != nil { log.Printf("[PricingFxDelta] claim_commit_error (%s): %v", reason, err) return } if len(items) == 0 { break } totalClaimed += len(items) codes := make([]string, 0, len(items)) for _, it := range items { if it.ProductCode != "" { codes = append(codes, it.ProductCode) } } written, _, err := queries.PublishDerivedPricesFromAnchor(ctx, pgDB, codes, "", false) if err != nil { // Mark all failed. tx2, _ := pgDB.BeginTx(ctx, nil) if tx2 != nil { for _, it := range items { _ = queries.MarkPriceRecalcQueueFailed(ctx, tx2, it.ID, it.Attempts, err.Error()) } _ = tx2.Commit() } log.Printf("[PricingFxDelta] publish_error (%s): claimed=%d err=%v", reason, len(items), err) return } totalWritten += written // Mark all done (even if some were skipped due to missing anchor). tx3, _ := pgDB.BeginTx(ctx, nil) if tx3 != nil { for _, it := range items { _ = queries.MarkPriceRecalcQueueDone(ctx, tx3, it.ID) } _ = tx3.Commit() } } log.Printf("[PricingFxDelta] ok (%s): claimed=%d sdprc_written=%d interval_min=%d batch_size=%d", reason, totalClaimed, totalWritten, intervalMin, batchSize) } go func() { time.Sleep(2 * time.Second) runOnce("startup") ticker := time.NewTicker(time.Duration(intervalMin) * time.Minute) defer ticker.Stop() for range ticker.C { runOnce("scheduled") } }() }