From a839cae840f3d98499152630850129d403f422ca Mon Sep 17 00:00:00 2001 From: M_Kececi Date: Mon, 22 Jun 2026 15:21:58 +0300 Subject: [PATCH] Merge remote-tracking branch 'origin/master' --- svc/main.go | 1 + svc/product_pricing_calc_scheduler.go | 169 +++++++++++++++++++++++ svc/product_pricing_fx_full_scheduler.go | 12 +- 3 files changed, 179 insertions(+), 3 deletions(-) create mode 100644 svc/product_pricing_calc_scheduler.go diff --git a/svc/main.go b/svc/main.go index 64d1365..36025f8 100644 --- a/svc/main.go +++ b/svc/main.go @@ -1352,6 +1352,7 @@ func main() { startTranslationSyncScheduler(pgDB, db.MssqlDB) startBrandSyncScheduler(pgDB, db.MssqlDB) startPricingParameterSyncScheduler(pgDB, db.MssqlDB) + startProductPricingCalcScheduler(pgDB) startProductPricingFxDeltaScheduler(pgDB) startProductPricingFxFullScheduler(pgDB) diff --git a/svc/product_pricing_calc_scheduler.go b/svc/product_pricing_calc_scheduler.go new file mode 100644 index 0000000..d962cc9 --- /dev/null +++ b/svc/product_pricing_calc_scheduler.go @@ -0,0 +1,169 @@ +package main + +import ( + "bssapp-backend/queries" + "context" + "database/sql" + "log" + "os" + "strconv" + "strings" + "sync/atomic" + "time" +) + +func startProductPricingCalcScheduler(pgDB *sql.DB) { + enabled := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_ENABLED"))) + if enabled == "0" || enabled == "false" || enabled == "off" { + log.Println("Product pricing calc scheduler disabled") + return + } + if pgDB == nil { + return + } + + intervalHours := 24 + if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_CALC_INTERVAL_HOURS")); raw != "" { + if parsed, err := strconv.Atoi(raw); err == nil && parsed >= 1 { + intervalHours = parsed + } + } + + codeBatch := 1000 + if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_CALC_PUBLISH_CODE_BATCH")); raw != "" { + if parsed, err := strconv.Atoi(raw); err == nil && parsed >= 100 && parsed <= 5000 { + codeBatch = parsed + } + } + + runOnStartup := true + if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_RUN_ON_STARTUP"))); raw == "0" || raw == "false" || raw == "off" { + runOnStartup = false + } + // Snapshot calculation should not update sdprc by default. sdprc is updated by + // the weekly full FX publisher and by the delta queue after manual price edits. + publishEnabled := false + if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_PUBLISH_ENABLED"))); raw == "1" || raw == "true" || raw == "on" || raw == "yes" { + publishEnabled = true + } + forceFxRefresh := true + if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_FORCE_FX_REFRESH"))); raw == "0" || raw == "false" || raw == "off" { + forceFxRefresh = false + } + + var running int32 + + runOnce := func(reason string) { + if !atomic.CompareAndSwapInt32(&running, 0, 1) { + log.Printf("[ProductPricingCalcJob] skip (%s): already running", reason) + return + } + defer atomic.StoreInt32(&running, 0) + + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Hour) + defer cancel() + + started := time.Now() + result, err := queries.CalculateProductPricingSnapshots(ctx, pgDB, queries.ProductPricingSnapshotCalcRequest{ + ForceFxRefresh: forceFxRefresh, + }) + if err != nil { + log.Printf("[ProductPricingCalcJob] calculate_error (%s): err=%v duration=%s", reason, err, time.Since(started).Round(time.Second)) + return + } + + totalWritten := 0 + totalSkipped := 0 + totalProducts := 0 + if publishEnabled { + written, skipped, products, err := publishAllProductPricingSnapshots(ctx, pgDB, codeBatch, forceFxRefresh) + if err != nil { + log.Printf("[ProductPricingCalcJob] publish_error (%s): calculated=%d err=%v duration=%s", reason, result.Calculated, err, time.Since(started).Round(time.Second)) + return + } + totalWritten = written + totalSkipped = skipped + totalProducts = products + } + + log.Printf("[ProductPricingCalcJob] ok (%s): requested=%d calculated=%d skipped=%d fx_date=%s snapshot_publish=%t products=%d sdprc_written=%d publish_skipped=%d interval_hours=%d duration=%s", + reason, + result.Requested, + result.Calculated, + result.Skipped, + result.RateDate, + publishEnabled, + totalProducts, + totalWritten, + totalSkipped, + intervalHours, + time.Since(started).Round(time.Second), + ) + } + + go func() { + if runOnStartup { + time.Sleep(5 * time.Second) + runOnce("startup") + } + + ticker := time.NewTicker(time.Duration(intervalHours) * time.Hour) + defer ticker.Stop() + for range ticker.C { + runOnce("scheduled") + } + }() +} + +func publishAllProductPricingSnapshots(ctx context.Context, pgDB *sql.DB, codeBatch int, forceFxRefresh bool) (int, int, int, error) { + totalWritten := 0 + totalSkipped := 0 + totalProducts := 0 + lastCode := "" + for { + rows, err := pgDB.QueryContext(ctx, ` +SELECT product_code +FROM mk_price_snapshot +WHERE COALESCE(NULLIF(BTRIM(product_code), ''), '') <> '' + AND product_code > $1 +GROUP BY product_code +ORDER BY product_code +LIMIT $2 +`, lastCode, codeBatch) + if err != nil { + return totalWritten, totalSkipped, totalProducts, err + } + + codes := make([]string, 0, codeBatch) + for rows.Next() { + var code string + if err := rows.Scan(&code); err != nil { + _ = rows.Close() + return totalWritten, totalSkipped, totalProducts, err + } + code = strings.TrimSpace(code) + if code != "" { + codes = append(codes, code) + } + } + if err := rows.Err(); err != nil { + _ = rows.Close() + return totalWritten, totalSkipped, totalProducts, err + } + _ = rows.Close() + + if len(codes) == 0 { + break + } + lastCode = codes[len(codes)-1] + + written, skipped, err := queries.PublishDerivedPricesFromAnchor(ctx, pgDB, codes, "", forceFxRefresh) + if err != nil { + return totalWritten, totalSkipped, totalProducts, err + } + totalWritten += written + totalSkipped += skipped + totalProducts += len(codes) + } + return totalWritten, totalSkipped, totalProducts, nil +} diff --git a/svc/product_pricing_fx_full_scheduler.go b/svc/product_pricing_fx_full_scheduler.go index e349fe2..2bab85e 100644 --- a/svc/product_pricing_fx_full_scheduler.go +++ b/svc/product_pricing_fx_full_scheduler.go @@ -18,9 +18,8 @@ import ( // - Recomputes derived currencies from anchor tiers and writes to sdprc for all products in mk_price_snapshot. func startProductPricingFxFullScheduler(pgDB *sql.DB) { enabled := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_FX_FULL_ENABLED"))) - // Be conservative: require explicit opt-in. - if enabled != "1" && enabled != "true" && enabled != "on" && enabled != "yes" { - log.Println("Product pricing FX full scheduler disabled (set PRODUCT_PRICING_FX_FULL_ENABLED=1 to enable)") + if enabled == "0" || enabled == "false" || enabled == "off" { + log.Println("Product pricing FX full scheduler disabled") return } if pgDB == nil { @@ -48,6 +47,10 @@ func startProductPricingFxFullScheduler(pgDB *sql.DB) { codeBatch = n } } + runOnStartup := false + if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_FX_FULL_RUN_ON_STARTUP"))); raw == "1" || raw == "true" || raw == "on" || raw == "yes" { + runOnStartup = true + } var running int32 = 0 @@ -133,6 +136,9 @@ LIMIT $2 go func() { time.Sleep(2 * time.Second) + if runOnStartup { + runOnce("startup_manual") + } for { now := time.Now() n := nextRun(now)