package main import ( "bssapp-backend/queries" "context" "database/sql" "log" "os" "strconv" "strings" "sync/atomic" "time" ) func startProductPricingCalcScheduler(pgDB *sql.DB) { enabled := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_ENABLED"))) if enabled == "0" || enabled == "false" || enabled == "off" { log.Println("Product pricing calc scheduler disabled") return } if pgDB == nil { return } intervalHours := 24 if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_CALC_INTERVAL_HOURS")); raw != "" { if parsed, err := strconv.Atoi(raw); err == nil && parsed >= 1 { intervalHours = parsed } } codeBatch := 1000 if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_CALC_PUBLISH_CODE_BATCH")); raw != "" { if parsed, err := strconv.Atoi(raw); err == nil && parsed >= 100 && parsed <= 5000 { codeBatch = parsed } } runOnStartup := true if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_RUN_ON_STARTUP"))); raw == "0" || raw == "false" || raw == "off" { runOnStartup = false } // Snapshot calculation should not update sdprc by default. sdprc is updated by // the weekly full FX publisher and by the delta queue after manual price edits. publishEnabled := false if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_PUBLISH_ENABLED"))); raw == "1" || raw == "true" || raw == "on" || raw == "yes" { publishEnabled = true } forceFxRefresh := true if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_FORCE_FX_REFRESH"))); raw == "0" || raw == "false" || raw == "off" { forceFxRefresh = false } var running int32 runOnce := func(reason string) { if !atomic.CompareAndSwapInt32(&running, 0, 1) { log.Printf("[ProductPricingCalcJob] skip (%s): already running", reason) return } defer atomic.StoreInt32(&running, 0) ctx, cancel := context.WithTimeout(context.Background(), 4*time.Hour) defer cancel() started := time.Now() result, err := queries.CalculateProductPricingSnapshots(ctx, pgDB, queries.ProductPricingSnapshotCalcRequest{ ForceFxRefresh: forceFxRefresh, }) if err != nil { log.Printf("[ProductPricingCalcJob] calculate_error (%s): err=%v duration=%s", reason, err, time.Since(started).Round(time.Second)) return } totalWritten := 0 totalSkipped := 0 totalProducts := 0 if publishEnabled { written, skipped, products, err := publishAllProductPricingSnapshots(ctx, pgDB, codeBatch, forceFxRefresh) if err != nil { log.Printf("[ProductPricingCalcJob] publish_error (%s): calculated=%d err=%v duration=%s", reason, result.Calculated, err, time.Since(started).Round(time.Second)) return } totalWritten = written totalSkipped = skipped totalProducts = products } log.Printf("[ProductPricingCalcJob] ok (%s): requested=%d calculated=%d skipped=%d fx_date=%s snapshot_publish=%t products=%d sdprc_written=%d publish_skipped=%d interval_hours=%d duration=%s", reason, result.Requested, result.Calculated, result.Skipped, result.RateDate, publishEnabled, totalProducts, totalWritten, totalSkipped, intervalHours, time.Since(started).Round(time.Second), ) } go func() { if runOnStartup { time.Sleep(5 * time.Second) runOnce("startup") } ticker := time.NewTicker(time.Duration(intervalHours) * time.Hour) defer ticker.Stop() for range ticker.C { runOnce("scheduled") } }() } func publishAllProductPricingSnapshots(ctx context.Context, pgDB *sql.DB, codeBatch int, forceFxRefresh bool) (int, int, int, error) { totalWritten := 0 totalSkipped := 0 totalProducts := 0 lastCode := "" for { rows, err := pgDB.QueryContext(ctx, ` SELECT product_code FROM mk_price_snapshot WHERE COALESCE(NULLIF(BTRIM(product_code), ''), '') <> '' AND product_code > $1 GROUP BY product_code ORDER BY product_code LIMIT $2 `, lastCode, codeBatch) if err != nil { return totalWritten, totalSkipped, totalProducts, err } codes := make([]string, 0, codeBatch) for rows.Next() { var code string if err := rows.Scan(&code); err != nil { _ = rows.Close() return totalWritten, totalSkipped, totalProducts, err } code = strings.TrimSpace(code) if code != "" { codes = append(codes, code) } } if err := rows.Err(); err != nil { _ = rows.Close() return totalWritten, totalSkipped, totalProducts, err } _ = rows.Close() if len(codes) == 0 { break } lastCode = codes[len(codes)-1] written, skipped, err := queries.PublishDerivedPricesFromAnchor(ctx, pgDB, codes, "", forceFxRefresh) if err != nil { return totalWritten, totalSkipped, totalProducts, err } totalWritten += written totalSkipped += skipped totalProducts += len(codes) } return totalWritten, totalSkipped, totalProducts, nil }