package main import ( "bssapp-backend/db" "bssapp-backend/queries" "context" "database/sql" "log" "os" "strconv" "strings" "sync/atomic" "time" ) // Weekly full FX publish job: // - Runs once every Monday at a configured local time. // - Recomputes derived currencies from anchor tiers and writes to sdprc for all products in mk_price_snapshot. func startProductPricingFxFullScheduler(pgDB *sql.DB) { enabled := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_FX_FULL_ENABLED"))) // Be conservative: require explicit opt-in. if enabled != "1" && enabled != "true" && enabled != "on" && enabled != "yes" { log.Println("Product pricing FX full scheduler disabled (set PRODUCT_PRICING_FX_FULL_ENABLED=1 to enable)") return } if pgDB == nil { return } // Default: Monday 06:00 local time. runHH := 6 runMM := 0 if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_FX_FULL_HHMM")); raw != "" { parts := strings.Split(raw, ":") if len(parts) == 2 { if h, err := strconv.Atoi(strings.TrimSpace(parts[0])); err == nil && h >= 0 && h <= 23 { runHH = h } if m, err := strconv.Atoi(strings.TrimSpace(parts[1])); err == nil && m >= 0 && m <= 59 { runMM = m } } } codeBatch := 1000 if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_FX_FULL_CODE_BATCH")); raw != "" { if n, err := strconv.Atoi(raw); err == nil && n >= 100 && n <= 5000 { codeBatch = n } } var running int32 = 0 runOnce := func(reason string) { if db.PgDB == nil { return } if !atomic.CompareAndSwapInt32(&running, 0, 1) { log.Printf("[PricingFxFull] skip (%s): already running", reason) return } defer atomic.StoreInt32(&running, 0) ctx, cancel := context.WithTimeout(context.Background(), 4*time.Hour) defer cancel() totalCodes := 0 totalWritten := 0 totalSkipped := 0 lastCode := "" for { rows, err := pgDB.QueryContext(ctx, ` SELECT product_code FROM mk_price_snapshot WHERE COALESCE(NULLIF(BTRIM(product_code), ''), '') <> '' AND product_code > $1 GROUP BY product_code ORDER BY product_code LIMIT $2 `, lastCode, codeBatch) if err != nil { log.Printf("[PricingFxFull] list_codes_error (%s): %v", reason, err) return } codes := make([]string, 0, codeBatch) for rows.Next() { var c string if err := rows.Scan(&c); err != nil { _ = rows.Close() log.Printf("[PricingFxFull] scan_code_error (%s): %v", reason, err) return } c = strings.TrimSpace(c) if c != "" { codes = append(codes, c) } } _ = rows.Close() if len(codes) == 0 { break } lastCode = codes[len(codes)-1] // Force FX refresh on the weekly run so Monday picks up the latest rates. written, skipped, err := queries.PublishDerivedPricesFromAnchor(ctx, pgDB, codes, "", true) if err != nil { log.Printf("[PricingFxFull] publish_error (%s): codes=%d err=%v", reason, len(codes), err) return } totalCodes += len(codes) totalWritten += written totalSkipped += skipped } log.Printf("[PricingFxFull] ok (%s): products=%d sdprc_written=%d skipped=%d weekday=%d hhmm=%02d:%02d", reason, totalCodes, totalWritten, totalSkipped, int(time.Now().Weekday()), runHH, runMM) } nextRun := func(now time.Time) time.Time { loc := now.Location() base := time.Date(now.Year(), now.Month(), now.Day(), runHH, runMM, 0, 0, loc) daysUntilMon := (int(time.Monday) - int(now.Weekday()) + 7) % 7 candidate := base.AddDate(0, 0, daysUntilMon) // If today is Monday but the time has passed, schedule next Monday. if !candidate.After(now) { candidate = candidate.AddDate(0, 0, 7) } return candidate } go func() { time.Sleep(2 * time.Second) for { now := time.Now() n := nextRun(now) d := time.Until(n) if d < 0 { d = time.Minute } log.Printf("[PricingFxFull] scheduled next_at=%s in=%s", n.Format(time.RFC3339), d.Round(time.Second)) time.Sleep(d) runOnce("weekly") } }() }