Merge remote-tracking branch 'origin/master'

This commit is contained in:
M_Kececi
2026-06-22 15:21:58 +03:00
parent f5bebe6f25
commit a839cae840
3 changed files with 179 additions and 3 deletions

View File

@@ -1352,6 +1352,7 @@ func main() {
startTranslationSyncScheduler(pgDB, db.MssqlDB) startTranslationSyncScheduler(pgDB, db.MssqlDB)
startBrandSyncScheduler(pgDB, db.MssqlDB) startBrandSyncScheduler(pgDB, db.MssqlDB)
startPricingParameterSyncScheduler(pgDB, db.MssqlDB) startPricingParameterSyncScheduler(pgDB, db.MssqlDB)
startProductPricingCalcScheduler(pgDB)
startProductPricingFxDeltaScheduler(pgDB) startProductPricingFxDeltaScheduler(pgDB)
startProductPricingFxFullScheduler(pgDB) startProductPricingFxFullScheduler(pgDB)

View File

@@ -0,0 +1,169 @@
package main
import (
"bssapp-backend/queries"
"context"
"database/sql"
"log"
"os"
"strconv"
"strings"
"sync/atomic"
"time"
)
func startProductPricingCalcScheduler(pgDB *sql.DB) {
enabled := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_ENABLED")))
if enabled == "0" || enabled == "false" || enabled == "off" {
log.Println("Product pricing calc scheduler disabled")
return
}
if pgDB == nil {
return
}
intervalHours := 24
if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_CALC_INTERVAL_HOURS")); raw != "" {
if parsed, err := strconv.Atoi(raw); err == nil && parsed >= 1 {
intervalHours = parsed
}
}
codeBatch := 1000
if raw := strings.TrimSpace(os.Getenv("PRODUCT_PRICING_CALC_PUBLISH_CODE_BATCH")); raw != "" {
if parsed, err := strconv.Atoi(raw); err == nil && parsed >= 100 && parsed <= 5000 {
codeBatch = parsed
}
}
runOnStartup := true
if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_RUN_ON_STARTUP"))); raw == "0" || raw == "false" || raw == "off" {
runOnStartup = false
}
// Snapshot calculation should not update sdprc by default. sdprc is updated by
// the weekly full FX publisher and by the delta queue after manual price edits.
publishEnabled := false
if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_PUBLISH_ENABLED"))); raw == "1" || raw == "true" || raw == "on" || raw == "yes" {
publishEnabled = true
}
forceFxRefresh := true
if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_CALC_FORCE_FX_REFRESH"))); raw == "0" || raw == "false" || raw == "off" {
forceFxRefresh = false
}
var running int32
runOnce := func(reason string) {
if !atomic.CompareAndSwapInt32(&running, 0, 1) {
log.Printf("[ProductPricingCalcJob] skip (%s): already running", reason)
return
}
defer atomic.StoreInt32(&running, 0)
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Hour)
defer cancel()
started := time.Now()
result, err := queries.CalculateProductPricingSnapshots(ctx, pgDB, queries.ProductPricingSnapshotCalcRequest{
ForceFxRefresh: forceFxRefresh,
})
if err != nil {
log.Printf("[ProductPricingCalcJob] calculate_error (%s): err=%v duration=%s", reason, err, time.Since(started).Round(time.Second))
return
}
totalWritten := 0
totalSkipped := 0
totalProducts := 0
if publishEnabled {
written, skipped, products, err := publishAllProductPricingSnapshots(ctx, pgDB, codeBatch, forceFxRefresh)
if err != nil {
log.Printf("[ProductPricingCalcJob] publish_error (%s): calculated=%d err=%v duration=%s", reason, result.Calculated, err, time.Since(started).Round(time.Second))
return
}
totalWritten = written
totalSkipped = skipped
totalProducts = products
}
log.Printf("[ProductPricingCalcJob] ok (%s): requested=%d calculated=%d skipped=%d fx_date=%s snapshot_publish=%t products=%d sdprc_written=%d publish_skipped=%d interval_hours=%d duration=%s",
reason,
result.Requested,
result.Calculated,
result.Skipped,
result.RateDate,
publishEnabled,
totalProducts,
totalWritten,
totalSkipped,
intervalHours,
time.Since(started).Round(time.Second),
)
}
go func() {
if runOnStartup {
time.Sleep(5 * time.Second)
runOnce("startup")
}
ticker := time.NewTicker(time.Duration(intervalHours) * time.Hour)
defer ticker.Stop()
for range ticker.C {
runOnce("scheduled")
}
}()
}
func publishAllProductPricingSnapshots(ctx context.Context, pgDB *sql.DB, codeBatch int, forceFxRefresh bool) (int, int, int, error) {
totalWritten := 0
totalSkipped := 0
totalProducts := 0
lastCode := ""
for {
rows, err := pgDB.QueryContext(ctx, `
SELECT product_code
FROM mk_price_snapshot
WHERE COALESCE(NULLIF(BTRIM(product_code), ''), '') <> ''
AND product_code > $1
GROUP BY product_code
ORDER BY product_code
LIMIT $2
`, lastCode, codeBatch)
if err != nil {
return totalWritten, totalSkipped, totalProducts, err
}
codes := make([]string, 0, codeBatch)
for rows.Next() {
var code string
if err := rows.Scan(&code); err != nil {
_ = rows.Close()
return totalWritten, totalSkipped, totalProducts, err
}
code = strings.TrimSpace(code)
if code != "" {
codes = append(codes, code)
}
}
if err := rows.Err(); err != nil {
_ = rows.Close()
return totalWritten, totalSkipped, totalProducts, err
}
_ = rows.Close()
if len(codes) == 0 {
break
}
lastCode = codes[len(codes)-1]
written, skipped, err := queries.PublishDerivedPricesFromAnchor(ctx, pgDB, codes, "", forceFxRefresh)
if err != nil {
return totalWritten, totalSkipped, totalProducts, err
}
totalWritten += written
totalSkipped += skipped
totalProducts += len(codes)
}
return totalWritten, totalSkipped, totalProducts, nil
}

View File

@@ -18,9 +18,8 @@ import (
// - Recomputes derived currencies from anchor tiers and writes to sdprc for all products in mk_price_snapshot. // - Recomputes derived currencies from anchor tiers and writes to sdprc for all products in mk_price_snapshot.
func startProductPricingFxFullScheduler(pgDB *sql.DB) { func startProductPricingFxFullScheduler(pgDB *sql.DB) {
enabled := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_FX_FULL_ENABLED"))) enabled := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_FX_FULL_ENABLED")))
// Be conservative: require explicit opt-in. if enabled == "0" || enabled == "false" || enabled == "off" {
if enabled != "1" && enabled != "true" && enabled != "on" && enabled != "yes" { log.Println("Product pricing FX full scheduler disabled")
log.Println("Product pricing FX full scheduler disabled (set PRODUCT_PRICING_FX_FULL_ENABLED=1 to enable)")
return return
} }
if pgDB == nil { if pgDB == nil {
@@ -48,6 +47,10 @@ func startProductPricingFxFullScheduler(pgDB *sql.DB) {
codeBatch = n codeBatch = n
} }
} }
runOnStartup := false
if raw := strings.TrimSpace(strings.ToLower(os.Getenv("PRODUCT_PRICING_FX_FULL_RUN_ON_STARTUP"))); raw == "1" || raw == "true" || raw == "on" || raw == "yes" {
runOnStartup = true
}
var running int32 = 0 var running int32 = 0
@@ -133,6 +136,9 @@ LIMIT $2
go func() { go func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
if runOnStartup {
runOnce("startup_manual")
}
for { for {
now := time.Now() now := time.Now()
n := nextRun(now) n := nextRun(now)