package routes import ( "bssapp-backend/auth" "bssapp-backend/db" "bssapp-backend/internal/mailer" "bssapp-backend/queries" "bssapp-backend/utils" "context" "database/sql" "encoding/json" "fmt" "log" "log/slog" "math" "net/http" "strconv" "strings" "time" ) type productPricingSaveItem struct { ProductCode string `json:"product_code"` BasePriceUsd float64 `json:"base_price_usd"` BasePriceTry float64 `json:"base_price_try"` USD1 float64 `json:"usd1"` USD2 float64 `json:"usd2"` USD3 float64 `json:"usd3"` USD4 float64 `json:"usd4"` USD5 float64 `json:"usd5"` USD6 float64 `json:"usd6"` EUR1 float64 `json:"eur1"` EUR2 float64 `json:"eur2"` EUR3 float64 `json:"eur3"` EUR4 float64 `json:"eur4"` EUR5 float64 `json:"eur5"` EUR6 float64 `json:"eur6"` TRY1 float64 `json:"try1"` TRY2 float64 `json:"try2"` TRY3 float64 `json:"try3"` TRY4 float64 `json:"try4"` TRY5 float64 `json:"try5"` TRY6 float64 `json:"try6"` } type productPricingSavePayload struct { Items []productPricingSaveItem `json:"items"` } func resolveOrCreatePriceListHeaderID(ctx context.Context, tx *sql.Tx, priceGroup string, currency string, username string, logger *slog.Logger) (string, error) { priceGroup = strings.TrimSpace(priceGroup) currency = strings.ToUpper(strings.TrimSpace(currency)) if priceGroup == "" { return "", fmt.Errorf("empty price group") } if currency != "USD" && currency != "EUR" && currency != "TRY" { return "", fmt.Errorf("invalid currency") } // Try existing header for group+currency. var headerID string _ = tx.QueryRowContext(ctx, ` SELECT TOP (1) CONVERT(NVARCHAR(36), PriceListHeaderID) FROM dbo.trPriceListHeader WITH (UPDLOCK, HOLDLOCK) WHERE CompanyCode = 1 AND LTRIM(RTRIM(PriceGroupCode)) = @pg AND LTRIM(RTRIM(DocCurrencyCode)) = @cur ORDER BY ValidDate DESC, ValidTime DESC, LastUpdatedDate DESC; `, sql.Named("pg", priceGroup), sql.Named("cur", currency)).Scan(&headerID) headerID = strings.TrimSpace(headerID) if headerID != "" { logger.Info("save:mssql:header:resolved", "price_group", priceGroup, "currency", currency, "header_id", headerID, ) return headerID, nil } // Create header (PriceListNumber pattern: "1-"). // Note: PriceListNumber is unique (constraint seen as UQ_trPriceListHeader_1), so compute next and retry on collisions. isTaxIncluded := 0 if strings.HasPrefix(strings.ToUpper(priceGroup), "B2C-") { isTaxIncluded = 1 } var priceListNumber string var err error for attempt := 1; attempt <= 5; attempt++ { var nextSeq int64 if err2 := tx.QueryRowContext(ctx, ` SELECT ISNULL(MAX(CASE WHEN v.n >= 10000 THEN v.n END), 9999) + 1 FROM dbo.trPriceListHeader h WITH (UPDLOCK, HOLDLOCK) CROSS APPLY (VALUES ( SUBSTRING(LTRIM(RTRIM(h.PriceListNumber)), CHARINDEX('-', LTRIM(RTRIM(h.PriceListNumber))) + 1, 50) )) s(sfx) CROSS APPLY (VALUES ( CASE WHEN s.sfx NOT LIKE '%[^0-9]%' THEN CAST(s.sfx AS BIGINT) ELSE NULL END )) v(n) WHERE LTRIM(RTRIM(h.PriceListNumber)) LIKE '1-%' AND CHARINDEX('-', LTRIM(RTRIM(h.PriceListNumber))) > 0; `).Scan(&nextSeq); err2 != nil { // If we cannot compute the next sequence (SQL dialect/version), log and fall back to the starting point. logger.Error("save:mssql:header:nextseq:error", "price_group", priceGroup, "currency", currency, "attempt", attempt, "err", err2, ) nextSeq = 10000 } if nextSeq <= 0 { nextSeq = 10000 } if nextSeq < 10000 { nextSeq = 10000 } priceListNumber = fmt.Sprintf("1-%d", nextSeq) _, err = tx.ExecContext(ctx, ` DECLARE @HeaderID UNIQUEIDENTIFIER = NEWID(); INSERT INTO dbo.trPriceListHeader ( PriceListHeaderID, PriceListNumber, PriceListDate, PriceListTime, PriceListTypeCode, CompanyCode, PriceGroupCode, ValidDate, ValidTime, DocCurrencyCode, Description, IsTaxIncluded, IsCompleted, IsPrinted, IsLocked, IsConfirmed, ConfirmedUserName, ConfirmedDate, ApplicationCode, ApplicationID, CreatedUserName, CreatedDate, LastUpdatedUserName, LastUpdatedDate ) VALUES ( @HeaderID, @PriceListNumber, CONVERT(date, GETDATE()), '00:00:00', '', 1, @PriceGroupCode, CONVERT(date, GETDATE()), '00:00:00', @Currency, @Description, @IsTaxIncluded, 1, 0, 0, 1, @UserName, GETDATE(), 'Price', CONVERT(NVARCHAR(36), @HeaderID), @UserName, GETDATE(), @UserName, GETDATE() ); `, sql.Named("PriceListNumber", priceListNumber), sql.Named("PriceGroupCode", priceGroup), sql.Named("Currency", currency), sql.Named("Description", priceGroup), sql.Named("IsTaxIncluded", isTaxIncluded), sql.Named("UserName", username), ) if err == nil { break } low := strings.ToLower(err.Error()) if strings.Contains(low, "uq_trpricelistheader_1") || strings.Contains(low, "duplicate key") { logger.Warn("save:mssql:header:create:collision", "price_group", priceGroup, "currency", currency, "price_list_number", priceListNumber, "attempt", attempt, "err", err, ) time.Sleep(time.Duration(20*attempt) * time.Millisecond) continue } return "", fmt.Errorf("create trPriceListHeader failed for PriceGroupCode=%s currency=%s: %w", priceGroup, currency, err) } if err != nil { return "", fmt.Errorf("create trPriceListHeader failed for PriceGroupCode=%s currency=%s: %w", priceGroup, currency, err) } // Re-read header id. err = tx.QueryRowContext(ctx, ` SELECT TOP (1) CONVERT(NVARCHAR(36), PriceListHeaderID) FROM dbo.trPriceListHeader WITH (NOLOCK) WHERE CompanyCode = 1 AND LTRIM(RTRIM(PriceGroupCode)) = @pg AND LTRIM(RTRIM(DocCurrencyCode)) = @cur ORDER BY CreatedDate DESC, LastUpdatedDate DESC; `, sql.Named("pg", priceGroup), sql.Named("cur", currency)).Scan(&headerID) if err != nil { return "", fmt.Errorf("create header ok but cannot re-read header id: %w", err) } headerID = strings.TrimSpace(headerID) if headerID == "" { return "", fmt.Errorf("create header ok but header id is empty") } logger.Info("save:mssql:header:created", "price_group", priceGroup, "currency", currency, "header_id", headerID, "price_list_number", priceListNumber, ) return headerID, nil } func PostProductPricingSaveHandler(pg *sql.DB, ml *mailer.GraphMailer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { started := time.Now() traceID := utils.TraceIDFromRequest(r) w.Header().Set("X-Trace-ID", traceID) claims, ok := auth.GetClaimsFromContext(r.Context()) if !ok || claims == nil { http.Error(w, "unauthorized", http.StatusUnauthorized) return } var payload productPricingSavePayload if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { http.Error(w, "invalid payload", http.StatusBadRequest) return } if len(payload.Items) == 0 { w.Header().Set("Content-Type", "application/json; charset=utf-8") _ = json.NewEncoder(w).Encode(map[string]any{"success": true, "saved": 0}) return } // Basic validation early. for _, it := range payload.Items { if strings.TrimSpace(it.ProductCode) == "" { http.Error(w, "product_code is required", http.StatusBadRequest) return } if it.BasePriceUsd < 0 || it.BasePriceTry < 0 { http.Error(w, "base prices must be >= 0", http.StatusBadRequest) return } } ctx, cancel := context.WithTimeout(r.Context(), 10*time.Minute) defer cancel() ctx = utils.ContextWithTraceID(ctx, traceID) logger := utils.SlogFromContext(ctx).With("handler", "product-pricing.save", "trace_id", traceID, "user", claims.Username, "user_id", claims.ID) mssql := db.GetDB() if mssql == nil { http.Error(w, "mssql not connected", http.StatusInternalServerError) return } pgTx, err := pg.BeginTx(ctx, nil) if err != nil { http.Error(w, "pg transaction start error", http.StatusInternalServerError) return } defer pgTx.Rollback() msTx, err := mssql.BeginTx(ctx, nil) if err != nil { http.Error(w, "mssql transaction start error", http.StatusInternalServerError) return } defer msTx.Rollback() // Serialize writes to pricing tables in PG to avoid contention with other pricing jobs. if _, err := pgTx.ExecContext(ctx, `SELECT pg_advisory_xact_lock(2001, 1)`); err != nil { http.Error(w, "pg advisory lock error", http.StatusInternalServerError) return } savedPG := 0 savedMSSQL := 0 missingPG := 0 missingMSSQL := 0 // Load mapping tables once. pgMap := map[string]map[int]int{} // currency -> level -> sdprcgrp_id nebimMap := map[string]map[int]string{} // currency -> level -> price_group_code { rows, err := pgTx.QueryContext(ctx, ` SELECT currency, level_no, COALESCE(sdprcgrp_id, 0) FROM mk_price_target_map_pg WHERE is_active = TRUE `) if err == nil { for rows.Next() { var cur string var level int var grp int if err := rows.Scan(&cur, &level, &grp); err != nil { _ = rows.Close() http.Error(w, "pg map scan error", http.StatusInternalServerError) return } cur = strings.ToUpper(strings.TrimSpace(cur)) if cur == "" || level <= 0 || level > 6 || grp <= 0 { continue } // In this setup sdprcgrp_id is expected to be 1..6. Guard against stale/invalid mappings. if grp < 1 || grp > 6 { continue } if pgMap[cur] == nil { pgMap[cur] = map[int]int{} } pgMap[cur][level] = grp } _ = rows.Close() } } { rows, err := pgTx.QueryContext(ctx, ` SELECT currency, level_no, COALESCE(NULLIF(BTRIM(price_group_code), ''), '') FROM mk_price_target_map_nebim WHERE is_active = TRUE `) if err == nil { for rows.Next() { var cur string var level int var code string if err := rows.Scan(&cur, &level, &code); err != nil { _ = rows.Close() http.Error(w, "nebim map scan error", http.StatusInternalServerError) return } cur = strings.ToUpper(strings.TrimSpace(cur)) code = strings.TrimSpace(code) if cur == "" || level <= 0 || level > 6 || code == "" { continue } if nebimMap[cur] == nil { nebimMap[cur] = map[int]string{} } nebimMap[cur][level] = code } _ = rows.Close() } } changed := make(map[string]struct{}, len(payload.Items)) // In-request cache to avoid repeating expensive dim resolution work. // Key: "|" where token is uppercased/trimmed. dimTokenLocalCache := make(map[string]int64, 256) type dimCombo struct { Dim1 int64 Dim3 sql.NullInt64 } type sdprcWriteRow struct { Currency string `json:"currency"` SdprcGrpID int `json:"sdprcgrp_id"` Dim1 int64 `json:"dim1"` Dim3 *int64 `json:"dim3"` Price float64 `json:"price"` } // sdprc has a unique constraint on the business key (tier/currency/dims). // If input rows contain duplicates for the same key (can happen due to tier/group mapping), // we must dedupe before bulk inserting to avoid 500s like "duplicate key violates uq_sdprc_2". dedupeSdprcWriteRows := func(productCode string, in []sdprcWriteRow) []sdprcWriteRow { if len(in) <= 1 { return in } type key struct { Cur string Grp int D1 int64 D3 int64 // 0 => NULL } idx := make(map[key]int, len(in)) out := make([]sdprcWriteRow, 0, len(in)) for _, r := range in { cur := strings.ToUpper(strings.TrimSpace(r.Currency)) d3k := int64(0) if r.Dim3 != nil && *r.Dim3 > 0 { d3k = *r.Dim3 } k := key{Cur: cur, Grp: r.SdprcGrpID, D1: r.Dim1, D3: d3k} if i, ok := idx[k]; ok { out[i] = r // keep last continue } idx[k] = len(out) out = append(out, r) } if len(out) != len(in) { logger.Warn("save:pg:sdprc:dedupe", "product_code", strings.TrimSpace(productCode), "in_rows", len(in), "out_rows", len(out), ) } return out } loadDimCombosFromCache := func(productCode string) ([]dimCombo, error) { productCode = strings.TrimSpace(productCode) if productCode == "" { return nil, nil } rows, err := pgTx.QueryContext(ctx, ` SELECT dim1, dim3 FROM mk_mmitem_dim_combo WHERE product_code = $1 ORDER BY dim1, dim3_key `, productCode) if err != nil { return nil, err } defer rows.Close() out := make([]dimCombo, 0, 32) for rows.Next() { var d1 int64 var d3 sql.NullInt64 if err := rows.Scan(&d1, &d3); err != nil { return nil, err } if d1 <= 0 { continue } out = append(out, dimCombo{Dim1: d1, Dim3: d3}) } return out, rows.Err() } parseDimID := func(s string) (int64, bool) { s = strings.TrimSpace(s) if s == "" { return 0, false } // tolerate leading zeros like "001" s2 := strings.TrimLeft(s, "0") if s2 == "" { s2 = "0" } n, err := strconv.ParseInt(s2, 10, 64) if err != nil || n <= 0 { return 0, false } return n, true } type queryRower interface { QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row } resolveDimvalFromToken := func(q queryRower, column, token string) (int64, bool) { token = strings.ToUpper(normalizeDimParam(token)) if token == "" { return 0, false } cacheKey := column + "|" + token if v, ok := dimTokenLocalCache[cacheKey]; ok { return v, v > 0 } // Fast path: persistent token->id mapping table. { var id int64 if err := pgTx.QueryRowContext(ctx, ` SELECT dim_id FROM mk_dim_token_map WHERE dim_column = $1 AND token = $2 `, column, token).Scan(&id); err == nil && id > 0 { dimTokenLocalCache[cacheKey] = id return id, true } } patterns := buildNameLikePatterns(token) if len(patterns) == 0 { dimTokenLocalCache[cacheKey] = 0 return 0, false } query := fmt.Sprintf(` SELECT x.dimv FROM ( SELECT COALESCE(%s::text, '') AS dimv, COUNT(*) AS cnt FROM dfblob WHERE src_table='mmitem' AND typ='img' AND COALESCE(%s::text, '') <> '' AND ( UPPER(COALESCE(file_name,'')) LIKE $1 OR UPPER(COALESCE(file_name,'')) LIKE $2 OR UPPER(COALESCE(file_name,'')) LIKE $3 OR UPPER(COALESCE(file_name,'')) LIKE $4 OR UPPER(COALESCE(file_name,'')) LIKE $5 OR UPPER(COALESCE(file_name,'')) LIKE $6 ) GROUP BY COALESCE(%s::text, '') ) x ORDER BY x.cnt DESC, x.dimv LIMIT 1 `, column, column, column) var v string if err := q.QueryRowContext(ctx, query, patterns[0], patterns[1], patterns[2], patterns[3], patterns[4], patterns[5], ).Scan(&v); err != nil { dimTokenLocalCache[cacheKey] = 0 return 0, false } v = normalizeDimParam(v) if v == "" { dimTokenLocalCache[cacheKey] = 0 return 0, false } id, ok := parseDimID(v) if !ok { dimTokenLocalCache[cacheKey] = 0 return 0, false } // Persist for future requests (best-effort). _, _ = pgTx.ExecContext(ctx, ` INSERT INTO mk_dim_token_map (dim_column, token, dim_id, updated_at) VALUES ($1,$2,$3,now()) ON CONFLICT (dim_column, token) DO UPDATE SET dim_id = EXCLUDED.dim_id, updated_at = EXCLUDED.updated_at `, column, token, id) dimTokenLocalCache[cacheKey] = id return id, true } loadDimsFromMssqlStock := func(productCode string) ([]dimCombo, error) { started := time.Now() if db.MssqlDB == nil { return nil, fmt.Errorf("mssql not ready") } rows, err := db.MssqlDB.QueryContext(ctx, queries.GetProductVariantDimsForPricing, productCode) if err != nil { return nil, err } defer rows.Close() out := make([]dimCombo, 0, 32) seen := make(map[string]struct{}, 64) readRows := 0 resolvedDim1 := 0 resolvedDim3 := 0 for rows.Next() { readRows++ var colorCode, dim1Code, dim3Code string if err := rows.Scan(&colorCode, &dim1Code, &dim3Code); err != nil { return nil, err } // Resolve to PG dim ids. For this installation we align with mmitem_dim key: // - dim1 = color // - dim3 = itemdim3 (optional) // Size (ItemDim1Code) is not part of the key here. d1 := int64(0) if id, ok := resolveDimvalFromToken(pgTx, "dimval1", colorCode); ok { d1 = id resolvedDim1++ } else if id, ok := resolveDimvalFromToken(pgTx, "dimval1", dim3Code); ok { d1 = id resolvedDim1++ } else if id, ok := resolveDimvalFromToken(pgTx, "dimval1", dim1Code); ok { d1 = id resolvedDim1++ } if d1 <= 0 { continue } var d3 sql.NullInt64 // dim3 corresponds to mmitem_dim.val3 (ItemDim3Code). if id, ok := resolveDimvalFromToken(pgTx, "dimval1", dim3Code); ok { d3 = sql.NullInt64{Int64: id, Valid: true} resolvedDim3++ } key := fmt.Sprintf("%d|%d", d1, func() int64 { if d3.Valid { return d3.Int64 } return 0 }()) if _, ok := seen[key]; ok { continue } seen[key] = struct{}{} out = append(out, dimCombo{Dim1: d1, Dim3: d3}) } if err := rows.Err(); err != nil { return nil, err } logger.Info("save:pg:dims:mssql:resolved", "product_code", strings.TrimSpace(productCode), "rows_read", readRows, "dims", len(out), "resolved_dim1", resolvedDim1, "resolved_dim3", resolvedDim3, "duration_ms", time.Since(started).Milliseconds(), ) return out, nil } ensureMMItemDimRows := func(mmItemID int64, combos []dimCombo, extraVal3 map[string]int64) { if mmItemID <= 0 || len(combos) == 0 { return } // Best-effort: don't assume a specific unique constraint exists on mmitem_dim. for _, c := range combos { if c.Dim1 <= 0 { continue } v2 := int64(0) var v2any any = nil if c.Dim3.Valid && c.Dim3.Int64 > 0 { v2 = c.Dim3.Int64 v2any = v2 } // If we managed to resolve an "ItemDim3Code" id too, store it in val3 and mark mmdim_id=3. v3 := int64(0) if extraVal3 != nil { if vv, ok := extraVal3[fmt.Sprintf("%d|%d", c.Dim1, v2)]; ok && vv > 0 { v3 = vv } } mmdimID := int64(2) var v3any any = nil if v3 > 0 { mmdimID = 3 v3any = v3 } _, _ = pgTx.ExecContext(ctx, ` INSERT INTO mmitem_dim (mmitem_id, mmdim_id, val1, val2, val3, is_active, qty) SELECT $1, $2, $3, $4, $5, TRUE, 0 WHERE NOT EXISTS ( SELECT 1 FROM mmitem_dim WHERE mmitem_id = $1 AND mmdim_id = $2 AND val1 = $3 AND COALESCE(val2, 0) = COALESCE($4::bigint, 0) AND COALESCE(val3, 0) = COALESCE($5::bigint, 0) LIMIT 1 ); `, mmItemID, mmdimID, c.Dim1, v2any, v3any) } } loadDimsFromPgMMItemDim := func(mmItemID int64, productCode string) ([]dimCombo, error) { started := time.Now() if mmItemID <= 0 { return nil, fmt.Errorf("invalid mmitem_id") } rows, err := pgTx.QueryContext(ctx, ` SELECT mmdim_id, val1, val2, val3 FROM mmitem_dim WHERE mmitem_id = $1 AND COALESCE(is_active, TRUE) = TRUE `, mmItemID) if err != nil { return nil, err } defer rows.Close() out := make([]dimCombo, 0, 64) seen := make(map[string]struct{}, 128) readRows := 0 for rows.Next() { readRows++ var mmdimID sql.NullInt64 var v1 sql.NullInt64 var v2 sql.NullInt64 var v3 sql.NullInt64 if err := rows.Scan(&mmdimID, &v1, &v2, &v3); err != nil { return nil, err } if !v1.Valid || v1.Int64 <= 0 { continue } // Variant key in this installation: (val1=color, val3=itemdim3_if_any). Ignore val2 (size). d1 := v1.Int64 _ = mmdimID _ = v2 var d3 sql.NullInt64 if v3.Valid && v3.Int64 > 0 { d3 = sql.NullInt64{Int64: v3.Int64, Valid: true} } key := fmt.Sprintf("%d|%d", d1, func() int64 { if d3.Valid { return d3.Int64 } return 0 }()) if _, ok := seen[key]; ok { continue } seen[key] = struct{}{} out = append(out, dimCombo{Dim1: d1, Dim3: d3}) } if err := rows.Err(); err != nil { return nil, err } logger.Info("save:pg:dims:mmitem-dim:loaded", "product_code", strings.TrimSpace(productCode), "mmitem_id", mmItemID, "rows_read", readRows, "dims", len(out), "duration_ms", time.Since(started).Milliseconds(), ) return out, nil } upsertDimCombosCache := func(productCode string, dims []dimCombo) error { productCode = strings.TrimSpace(productCode) if productCode == "" || len(dims) == 0 { return nil } for _, d := range dims { _, err := pgTx.ExecContext(ctx, ` INSERT INTO mk_mmitem_dim_combo (product_code, dim1, dim3, updated_at) VALUES ($1,$2,$3,now()) ON CONFLICT (product_code, dim1, dim3_key) DO UPDATE SET updated_at = EXCLUDED.updated_at `, productCode, d.Dim1, func() any { if d.Dim3.Valid { return d.Dim3.Int64 } return nil }()) if err != nil { return err } } return nil } bulkAppendOnlyInsertSdprc := func(mmItemID int64, productCode string, rows []sdprcWriteRow) (int, error) { if mmItemID <= 0 { return 0, fmt.Errorf("invalid mmitem_id") } if len(rows) == 0 { return 0, nil } raw, err := json.Marshal(rows) if err != nil { return 0, err } q := ` WITH input AS ( SELECT * FROM jsonb_to_recordset($1::jsonb) AS x(currency text, sdprcgrp_id int, dim1 bigint, dim3 bigint, price float8) ), norm AS ( SELECT UPPER(NULLIF(BTRIM(currency), '')) AS currency, COALESCE(sdprcgrp_id, 0) AS sdprcgrp_id, COALESCE(dim1, 0) AS dim1, dim3 AS dim3, COALESCE(price, 0) AS price FROM input ), filtered AS ( SELECT * FROM norm WHERE currency IN ('USD','EUR','TRY') AND sdprcgrp_id BETWEEN 1 AND 6 AND dim1 > 0 AND price > 0 ), latest AS ( SELECT DISTINCT ON (s.sdprcgrp_id, s.crn, s.dim1, COALESCE(s.dim3, 0)) s.sdprcgrp_id, s.crn, s.dim1, s.dim3, s.prc FROM sdprc s WHERE s.mmitem_id = $2 AND (s.sdprcgrp_id, s.crn, s.dim1, COALESCE(s.dim3, 0)) IN ( SELECT sdprcgrp_id, currency, dim1, COALESCE(dim3, 0) FROM filtered ) ORDER BY s.sdprcgrp_id, s.crn, s.dim1, COALESCE(s.dim3, 0), s.zlins_dttm DESC, s.id DESC ), to_insert AS ( SELECT $2::bigint AS mmitem_id, f.sdprcgrp_id, f.currency AS crn, f.dim1, f.dim3, f.price AS prc FROM filtered f LEFT JOIN latest l ON l.sdprcgrp_id = f.sdprcgrp_id AND l.crn = f.currency AND l.dim1 = f.dim1 AND ((l.dim3 IS NULL AND f.dim3 IS NULL) OR l.dim3 = f.dim3) WHERE l.prc IS NULL OR l.prc IS DISTINCT FROM f.price ), ins AS ( INSERT INTO sdprc (mmitem_id, sdprcgrp_id, crn, dim1, dim3, prc, zlins_dttm) SELECT mmitem_id, sdprcgrp_id, crn, dim1, dim3, prc, now() FROM to_insert RETURNING 1 ) SELECT COUNT(*)::int FROM ins; ` var inserted int if err := pgTx.QueryRowContext(ctx, q, raw, mmItemID).Scan(&inserted); err != nil { return 0, err } if inserted > 0 { savedPG += inserted changed[productCode] = struct{}{} } return inserted, nil } // MSSQL memoization: reduce chatter for large batches. // header id cache key: "|" msHeaderIDCache := make(map[string]string, 64) // next sort cache key: "" msHeaderNextSort := make(map[string]int64, 64) type msLatestKey struct { Cur string PriceGroup string } loadLatestPricesForProduct := func(productCode string, pairs []msLatestKey) (map[string]float64, map[string]bool) { out := make(map[string]float64, len(pairs)) ok := make(map[string]bool, len(pairs)) productCode = strings.TrimSpace(productCode) if productCode == "" || len(pairs) == 0 { return out, ok } conds := make([]string, 0, len(pairs)) args := []any{sql.Named("ItemCode", productCode)} for i, p := range pairs { pg := strings.TrimSpace(p.PriceGroup) cur := strings.ToUpper(strings.TrimSpace(p.Cur)) if pg == "" || (cur != "USD" && cur != "EUR" && cur != "TRY") { continue } args = append(args, sql.Named(fmt.Sprintf("pg%d", i), pg), sql.Named(fmt.Sprintf("cur%d", i), cur), ) conds = append(conds, fmt.Sprintf("(LTRIM(RTRIM(PriceGroupCode)) = @pg%d AND LTRIM(RTRIM(DocCurrencyCode)) = @cur%d)", i, i), ) } if len(conds) == 0 { return out, ok } q := fmt.Sprintf(` SELECT PriceGroupCode, DocCurrencyCode, Price FROM ( SELECT LTRIM(RTRIM(PriceGroupCode)) AS PriceGroupCode, LTRIM(RTRIM(DocCurrencyCode)) AS DocCurrencyCode, CAST(Price AS FLOAT) AS Price, ROW_NUMBER() OVER ( PARTITION BY LTRIM(RTRIM(PriceGroupCode)), LTRIM(RTRIM(DocCurrencyCode)) ORDER BY ValidDate DESC, ValidTime DESC, LastUpdatedDate DESC ) AS rn FROM dbo.trPriceListLine WITH(NOLOCK) WHERE ItemTypeCode = 1 AND LTRIM(RTRIM(ItemCode)) = @ItemCode AND ISNULL(IsDisabled, 0) = 0 AND (%s) ) x WHERE rn = 1; `, strings.Join(conds, " OR ")) rows, err := msTx.QueryContext(ctx, q, args...) if err != nil { logger.Warn("save:mssql:latest:prefetch:error", "product_code", productCode, "err", err) return out, ok } defer rows.Close() for rows.Next() { var pg, cur string var price float64 if err := rows.Scan(&pg, &cur, &price); err != nil { logger.Warn("save:mssql:latest:prefetch:scan:error", "product_code", productCode, "err", err) return out, ok } pg = strings.TrimSpace(pg) cur = strings.ToUpper(strings.TrimSpace(cur)) k := cur + "|" + pg out[k] = price ok[k] = true } return out, ok } // Helper: append-only Nebim price list line (insert new row when price changes). // Resolve PriceListHeaderID from trPriceListHeader (source of truth). // If header does not exist for the given PriceGroupCode+Currency, create it, then insert lines under that header. upsertPriceListLine := func(productCode string, currency string, priceGroup string, price float64, latest map[string]float64, latestOK map[string]bool) (bool, error) { currency = strings.ToUpper(strings.TrimSpace(currency)) priceGroup = strings.TrimSpace(priceGroup) if price <= 0 { return false, nil } if currency != "USD" && currency != "EUR" && currency != "TRY" { return false, fmt.Errorf("invalid currency") } if priceGroup == "" { return false, fmt.Errorf("empty price group") } // Resolve or create header id for that group/currency (memoized). headerKey := currency + "|" + priceGroup headerID := strings.TrimSpace(msHeaderIDCache[headerKey]) if headerID == "" { var err error headerID, err = resolveOrCreatePriceListHeaderID(ctx, msTx, priceGroup, currency, claims.Username, logger) if err != nil { return false, err } msHeaderIDCache[headerKey] = headerID } // If latest line already has the same price, no-op (prefer prefetch map). if latest != nil && latestOK != nil && latestOK[headerKey] { if curLatest, ok := latest[headerKey]; ok && math.Abs(curLatest-price) < 1e-9 { return false, nil } } else { // Fallback: query latest for this key if not prefetched. var latestPrice sql.NullFloat64 _ = msTx.QueryRowContext(ctx, ` SELECT TOP (1) CAST(Price AS FLOAT) FROM dbo.trPriceListLine WITH(NOLOCK) WHERE ItemTypeCode = 1 AND LTRIM(RTRIM(ItemCode)) = @p1 AND LTRIM(RTRIM(DocCurrencyCode)) = @p2 AND LTRIM(RTRIM(PriceGroupCode)) = @p3 AND ISNULL(IsDisabled, 0) = 0 ORDER BY ValidDate DESC, ValidTime DESC, LastUpdatedDate DESC; `, sql.Named("p1", productCode), sql.Named("p2", currency), sql.Named("p3", priceGroup)).Scan(&latestPrice) if latestPrice.Valid && math.Abs(latestPrice.Float64-price) < 1e-9 { return false, nil } } // SortOrder: append inside header. nextSort := msHeaderNextSort[headerID] if nextSort <= 0 { _ = msTx.QueryRowContext(ctx, ` SELECT ISNULL(MAX(SortOrder), 0) + 1 FROM dbo.trPriceListLine WITH(NOLOCK) WHERE PriceListHeaderID = CONVERT(UNIQUEIDENTIFIER, @p1); `, sql.Named("p1", headerID)).Scan(&nextSort) if nextSort <= 0 { nextSort = 1 } } msHeaderNextSort[headerID] = nextSort + 1 // Insert minimal line. _, err := msTx.ExecContext(ctx, ` INSERT INTO dbo.trPriceListLine ( PriceListLineID, SortOrder, ItemTypeCode, ItemCode, ColorCode, ItemDim1Code, ItemDim2Code, ItemDim3Code, UnitOfMeasureCode, PaymentPlanCode, LineDescription, DocCurrencyCode, Price, IsDisabled, DisableDate, CompanyCode, PriceGroupCode, ValidDate, ValidTime, PriceListHeaderID, CreatedUserName, CreatedDate, LastUpdatedUserName, LastUpdatedDate ) VALUES ( NEWID(), @SortOrder, 1, @ItemCode, '', '', '', '', 'AD', '', '', @Currency, @Price, 0, '1900-01-01', 1, @PriceGroupCode, CONVERT(date, GETDATE()), '00:00:00', CONVERT(uniqueidentifier, @HeaderID), @UserName, GETDATE(), @UserName, GETDATE() ); `, sql.Named("SortOrder", nextSort), sql.Named("ItemCode", productCode), sql.Named("Currency", currency), sql.Named("Price", price), sql.Named("PriceGroupCode", priceGroup), sql.Named("HeaderID", headerID), sql.Named("UserName", claims.Username), ) if err != nil { return false, err } return true, nil } for _, it := range payload.Items { code := strings.TrimSpace(it.ProductCode) if code == "" { continue } var latestMap map[string]float64 var latestOK map[string]bool var mmItemID int64 if err := pgTx.QueryRowContext(ctx, `SELECT id FROM mmitem WHERE code = $1`, code).Scan(&mmItemID); err != nil { // If missing in PG, we can still save MSSQL tiers; PG write will be skipped. mmItemID = 0 } dims := []dimCombo{} // Prefer PG's own authoritative dim combo table (mmitem_dim). Cache is only a fast-path fallback. if mmItemID > 0 { // 1) Authoritative source: mmitem_dim (PG). if d, err := loadDimsFromPgMMItemDim(mmItemID, code); err == nil && len(d) > 0 { dims = d _ = upsertDimCombosCache(code, dims) // best-effort cache fill } // 2) Cache fallback (fast). cacheStarted := time.Now() if len(dims) == 0 { cached, cacheErr := loadDimCombosFromCache(code) if cacheErr == nil && len(cached) > 0 { dims = cached logger.Info("save:pg:dims:cache:hit", "product_code", code, "dims", len(dims), "duration_ms", time.Since(cacheStarted).Milliseconds(), ) } else if cacheErr != nil { logger.Error("save:pg:dims:cache-load:error", "product_code", code, "err", cacheErr) } else { logger.Info("save:pg:dims:cache:miss", "product_code", code, "duration_ms", time.Since(cacheStarted).Milliseconds(), ) } } // 3) Last resort: MSSQL stock tokens (legacy). if len(dims) == 0 { d, err := loadDimsFromMssqlStock(code) if err != nil { logger.Error("save:pg:dims:mssql:error", "product_code", code, "err", err) } else { dims = d _ = upsertDimCombosCache(code, dims) // If PG doesn't have mmitem_dim rows for this product yet, try to seed them. ensureMMItemDimRows(mmItemID, dims, nil) } } } // Tier prices in PG sdprc + Nebim price list lines (mapped). type tier struct { Cur string Level int Price float64 } tiers := []tier{ {"USD", 1, it.USD1}, {"USD", 2, it.USD2}, {"USD", 3, it.USD3}, {"USD", 4, it.USD4}, {"USD", 5, it.USD5}, {"USD", 6, it.USD6}, {"EUR", 1, it.EUR1}, {"EUR", 2, it.EUR2}, {"EUR", 3, it.EUR3}, {"EUR", 4, it.EUR4}, {"EUR", 5, it.EUR5}, {"EUR", 6, it.EUR6}, {"TRY", 1, it.TRY1}, {"TRY", 2, it.TRY2}, {"TRY", 3, it.TRY3}, {"TRY", 4, it.TRY4}, {"TRY", 5, it.TRY5}, {"TRY", 6, it.TRY6}, } // Prefetch MSSQL latest prices for all relevant pairs for this product. // This turns N tier "TOP 1" lookups into a single query per product. { msPairs := make([]msLatestKey, 0, 24) seen := make(map[string]struct{}, 32) addPair := func(cur, pg string) { cur = strings.ToUpper(strings.TrimSpace(cur)) pg = strings.TrimSpace(pg) if pg == "" { return } k := cur + "|" + pg if _, ok := seen[k]; ok { return } seen[k] = struct{}{} msPairs = append(msPairs, msLatestKey{Cur: cur, PriceGroup: pg}) } if it.BasePriceUsd > 0 { addPair("USD", "TM-USD") } if it.BasePriceTry > 0 { addPair("TRY", "TM-TRY") } for _, t := range tiers { if t.Price <= 0 { continue } nebimGrp := "" if nebimMap[t.Cur] != nil { nebimGrp = nebimMap[t.Cur][t.Level] } if nebimGrp == "" { continue } addPair(t.Cur, nebimGrp) } latestMap, latestOK = loadLatestPricesForProduct(code, msPairs) } // Base prices in Nebim price lists. { ch, err := upsertPriceListLine(code, "USD", "TM-USD", it.BasePriceUsd, latestMap, latestOK) if err != nil { logger.Error("save:mssql:base-usd:error", "product_code", code, "err", err) http.Error(w, "mssql base price save error: "+err.Error(), http.StatusInternalServerError) return } if ch { changed[code] = struct{}{} savedMSSQL++ } ch, err = upsertPriceListLine(code, "TRY", "TM-TRY", it.BasePriceTry, latestMap, latestOK) if err != nil { logger.Error("save:mssql:base-try:error", "product_code", code, "err", err) http.Error(w, "mssql base price save error: "+err.Error(), http.StatusInternalServerError) return } if ch { changed[code] = struct{}{} savedMSSQL++ } } // PG write: bulk append-only insert across dims (fast). if mmItemID > 0 && len(dims) > 0 { writeRows := make([]sdprcWriteRow, 0, len(dims)*len(tiers)) for _, t := range tiers { if t.Price <= 0 { continue } pgGrp := 0 if pgMap[t.Cur] != nil { pgGrp = pgMap[t.Cur][t.Level] } if pgGrp <= 0 { pgGrp = t.Level } for _, dc := range dims { var d3 *int64 if dc.Dim3.Valid { v := dc.Dim3.Int64 d3 = &v } writeRows = append(writeRows, sdprcWriteRow{ Currency: t.Cur, SdprcGrpID: pgGrp, Dim1: dc.Dim1, Dim3: d3, Price: t.Price, }) } } if len(writeRows) > 0 { writeRows = dedupeSdprcWriteRows(code, writeRows) startPG := time.Now() inserted, err := bulkAppendOnlyInsertSdprc(mmItemID, code, writeRows) if err != nil { logger.Error("save:pg:sdprc:bulk:error", "product_code", code, "dims", len(dims), "rows", len(writeRows), "err", err) http.Error(w, "postgres tier save error: "+err.Error(), http.StatusInternalServerError) return } logger.Info("save:pg:sdprc:bulk:ok", "product_code", code, "dims", len(dims), "rows", len(writeRows), "inserted", inserted, "duration_ms", time.Since(startPG).Milliseconds()) } } else { for _, t := range tiers { if t.Price > 0 { missingPG++ logger.Warn("save:pg:sdprc:skip:no-dims", "product_code", code, "currency", t.Cur, "level", t.Level) } } } // MSSQL tier writes (mapped). for _, t := range tiers { nebimGrp := "" if nebimMap[t.Cur] != nil { nebimGrp = nebimMap[t.Cur][t.Level] } if nebimGrp == "" { if t.Price > 0 { missingMSSQL++ } continue } msChanged, err := upsertPriceListLine(code, t.Cur, nebimGrp, t.Price, latestMap, latestOK) if err != nil { logger.Error("save:mssql:tier:error", "product_code", code, "currency", t.Cur, "level", t.Level, "price_group", nebimGrp, "err", err) http.Error(w, "mssql tier save error: "+err.Error(), http.StatusInternalServerError) return } if msChanged { changed[code] = struct{}{} savedMSSQL++ } } } // Delta queue: only products with an explicit price change record should be processed by delta jobs. { codes := make([]string, 0, len(changed)) for c := range changed { codes = append(codes, c) } if _, err := queries.EnqueuePriceRecalc(ctx, pgTx, codes, "manual_price_save"); err != nil { logger.Error("save:enqueue:error", "err", err) http.Error(w, "price recalc enqueue error: "+err.Error(), http.StatusInternalServerError) return } } if err := msTx.Commit(); err != nil { logger.Error("save:mssql:commit:error", "err", err) http.Error(w, "mssql commit error", http.StatusInternalServerError) return } if err := pgTx.Commit(); err != nil { logger.Error("save:pg:commit:error", "err", err) http.Error(w, "postgres commit error", http.StatusInternalServerError) return } // Post-commit pricing mail: only for actually changed products. if ml != nil && len(changed) > 0 { changedCodes := make([]string, 0, len(changed)) for c := range changed { changedCodes = append(changedCodes, c) } actor := claims.Username go sendPricingChangeMails(context.Background(), ml, changedCodes, actor) } // Immediate FX delta publish kick (best-effort): run right away for changed products. // Queue entries are still created for reliability; on success we mark them done to avoid a second pass. if len(changed) > 0 { changedCodes := make([]string, 0, len(changed)) for c := range changed { changedCodes = append(changedCodes, c) } go func(codes []string) { ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel2() written, fxDateYmd, err := queries.PublishDerivedPricesFromAnchor(ctx2, pg, codes, "", false) if err != nil { log.Printf("[PricingFxImmediate] publish_error codes=%d err=%v", len(codes), err) return } tx2, err := pg.BeginTx(ctx2, nil) if err == nil { _, _ = queries.MarkPriceRecalcQueueDoneByProductCodes(ctx2, tx2, codes) _ = tx2.Commit() } log.Printf("[PricingFxImmediate] ok codes=%d sdprc_written=%d fx_date_ymd=%d", len(codes), written, fxDateYmd) }(changedCodes) } logger.Info("save:done", "items", len(payload.Items), "saved_pg", savedPG, "saved_mssql", savedMSSQL, "missing_pg", missingPG, "missing_mssql", missingMSSQL, "duration_ms", time.Since(started).Milliseconds(), ) w.Header().Set("Content-Type", "application/json; charset=utf-8") _ = json.NewEncoder(w).Encode(map[string]any{ "success": true, "saved_pg": savedPG, "saved_mssql": savedMSSQL, "missing_pg": missingPG, "missing_mssql": missingMSSQL, }) } }