Files
bssapp/svc/routes/product_pricing_save.go
2026-06-19 00:49:59 +03:00

1334 lines
37 KiB
Go

package routes
import (
"bssapp-backend/auth"
"bssapp-backend/db"
"bssapp-backend/internal/mailer"
"bssapp-backend/queries"
"bssapp-backend/utils"
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"log/slog"
"math"
"net/http"
"strconv"
"strings"
"time"
)
type productPricingSaveItem struct {
ProductCode string `json:"product_code"`
BasePriceUsd float64 `json:"base_price_usd"`
BasePriceTry float64 `json:"base_price_try"`
USD1 float64 `json:"usd1"`
USD2 float64 `json:"usd2"`
USD3 float64 `json:"usd3"`
USD4 float64 `json:"usd4"`
USD5 float64 `json:"usd5"`
USD6 float64 `json:"usd6"`
EUR1 float64 `json:"eur1"`
EUR2 float64 `json:"eur2"`
EUR3 float64 `json:"eur3"`
EUR4 float64 `json:"eur4"`
EUR5 float64 `json:"eur5"`
EUR6 float64 `json:"eur6"`
TRY1 float64 `json:"try1"`
TRY2 float64 `json:"try2"`
TRY3 float64 `json:"try3"`
TRY4 float64 `json:"try4"`
TRY5 float64 `json:"try5"`
TRY6 float64 `json:"try6"`
}
type productPricingSavePayload struct {
Items []productPricingSaveItem `json:"items"`
}
func resolveOrCreatePriceListHeaderID(ctx context.Context, tx *sql.Tx, priceGroup string, currency string, username string, logger *slog.Logger) (string, error) {
priceGroup = strings.TrimSpace(priceGroup)
currency = strings.ToUpper(strings.TrimSpace(currency))
if priceGroup == "" {
return "", fmt.Errorf("empty price group")
}
if currency != "USD" && currency != "EUR" && currency != "TRY" {
return "", fmt.Errorf("invalid currency")
}
// Try existing header for group+currency.
var headerID string
_ = tx.QueryRowContext(ctx, `
SELECT TOP (1) CONVERT(NVARCHAR(36), PriceListHeaderID)
FROM dbo.trPriceListHeader WITH (UPDLOCK, HOLDLOCK)
WHERE CompanyCode = 1
AND LTRIM(RTRIM(PriceGroupCode)) = @pg
AND LTRIM(RTRIM(DocCurrencyCode)) = @cur
ORDER BY ValidDate DESC, ValidTime DESC, LastUpdatedDate DESC;
`, sql.Named("pg", priceGroup), sql.Named("cur", currency)).Scan(&headerID)
headerID = strings.TrimSpace(headerID)
if headerID != "" {
logger.Info("save:mssql:header:resolved",
"price_group", priceGroup,
"currency", currency,
"header_id", headerID,
)
return headerID, nil
}
// Create header (PriceListNumber pattern: "1-<seq>").
// Note: PriceListNumber is unique (constraint seen as UQ_trPriceListHeader_1), so compute next and retry on collisions.
isTaxIncluded := 0
if strings.HasPrefix(strings.ToUpper(priceGroup), "B2C-") {
isTaxIncluded = 1
}
var priceListNumber string
var err error
for attempt := 1; attempt <= 5; attempt++ {
var nextSeq int64
if err2 := tx.QueryRowContext(ctx, `
SELECT ISNULL(MAX(CASE WHEN v.n >= 10000 THEN v.n END), 9999) + 1
FROM dbo.trPriceListHeader h WITH (UPDLOCK, HOLDLOCK)
CROSS APPLY (VALUES (
SUBSTRING(LTRIM(RTRIM(h.PriceListNumber)),
CHARINDEX('-', LTRIM(RTRIM(h.PriceListNumber))) + 1,
50)
)) s(sfx)
CROSS APPLY (VALUES (
CASE
WHEN s.sfx NOT LIKE '%[^0-9]%' THEN CAST(s.sfx AS BIGINT)
ELSE NULL
END
)) v(n)
WHERE LTRIM(RTRIM(h.PriceListNumber)) LIKE '1-%'
AND CHARINDEX('-', LTRIM(RTRIM(h.PriceListNumber))) > 0;
`).Scan(&nextSeq); err2 != nil {
// If we cannot compute the next sequence (SQL dialect/version), log and fall back to the starting point.
logger.Error("save:mssql:header:nextseq:error",
"price_group", priceGroup,
"currency", currency,
"attempt", attempt,
"err", err2,
)
nextSeq = 10000
}
if nextSeq <= 0 {
nextSeq = 10000
}
if nextSeq < 10000 {
nextSeq = 10000
}
priceListNumber = fmt.Sprintf("1-%d", nextSeq)
_, err = tx.ExecContext(ctx, `
DECLARE @HeaderID UNIQUEIDENTIFIER = NEWID();
INSERT INTO dbo.trPriceListHeader (
PriceListHeaderID,
PriceListNumber,
PriceListDate,
PriceListTime,
PriceListTypeCode,
CompanyCode,
PriceGroupCode,
ValidDate,
ValidTime,
DocCurrencyCode,
Description,
IsTaxIncluded,
IsCompleted,
IsPrinted,
IsLocked,
IsConfirmed,
ConfirmedUserName,
ConfirmedDate,
ApplicationCode,
ApplicationID,
CreatedUserName,
CreatedDate,
LastUpdatedUserName,
LastUpdatedDate
)
VALUES (
@HeaderID,
@PriceListNumber,
CONVERT(date, GETDATE()),
'00:00:00',
'',
1,
@PriceGroupCode,
CONVERT(date, GETDATE()),
'00:00:00',
@Currency,
@Description,
@IsTaxIncluded,
1,
0,
0,
1,
@UserName,
GETDATE(),
'Price',
CONVERT(NVARCHAR(36), @HeaderID),
@UserName,
GETDATE(),
@UserName,
GETDATE()
);
`, sql.Named("PriceListNumber", priceListNumber),
sql.Named("PriceGroupCode", priceGroup),
sql.Named("Currency", currency),
sql.Named("Description", priceGroup),
sql.Named("IsTaxIncluded", isTaxIncluded),
sql.Named("UserName", username),
)
if err == nil {
break
}
low := strings.ToLower(err.Error())
if strings.Contains(low, "uq_trpricelistheader_1") || strings.Contains(low, "duplicate key") {
logger.Warn("save:mssql:header:create:collision",
"price_group", priceGroup,
"currency", currency,
"price_list_number", priceListNumber,
"attempt", attempt,
"err", err,
)
time.Sleep(time.Duration(20*attempt) * time.Millisecond)
continue
}
return "", fmt.Errorf("create trPriceListHeader failed for PriceGroupCode=%s currency=%s: %w", priceGroup, currency, err)
}
if err != nil {
return "", fmt.Errorf("create trPriceListHeader failed for PriceGroupCode=%s currency=%s: %w", priceGroup, currency, err)
}
// Re-read header id.
err = tx.QueryRowContext(ctx, `
SELECT TOP (1) CONVERT(NVARCHAR(36), PriceListHeaderID)
FROM dbo.trPriceListHeader WITH (NOLOCK)
WHERE CompanyCode = 1
AND LTRIM(RTRIM(PriceGroupCode)) = @pg
AND LTRIM(RTRIM(DocCurrencyCode)) = @cur
ORDER BY CreatedDate DESC, LastUpdatedDate DESC;
`, sql.Named("pg", priceGroup), sql.Named("cur", currency)).Scan(&headerID)
if err != nil {
return "", fmt.Errorf("create header ok but cannot re-read header id: %w", err)
}
headerID = strings.TrimSpace(headerID)
if headerID == "" {
return "", fmt.Errorf("create header ok but header id is empty")
}
logger.Info("save:mssql:header:created",
"price_group", priceGroup,
"currency", currency,
"header_id", headerID,
"price_list_number", priceListNumber,
)
return headerID, nil
}
func PostProductPricingSaveHandler(pg *sql.DB, ml *mailer.GraphMailer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
started := time.Now()
traceID := utils.TraceIDFromRequest(r)
w.Header().Set("X-Trace-ID", traceID)
claims, ok := auth.GetClaimsFromContext(r.Context())
if !ok || claims == nil {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
var payload productPricingSavePayload
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
http.Error(w, "invalid payload", http.StatusBadRequest)
return
}
if len(payload.Items) == 0 {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
_ = json.NewEncoder(w).Encode(map[string]any{"success": true, "saved": 0})
return
}
// Basic validation early.
for _, it := range payload.Items {
if strings.TrimSpace(it.ProductCode) == "" {
http.Error(w, "product_code is required", http.StatusBadRequest)
return
}
if it.BasePriceUsd < 0 || it.BasePriceTry < 0 {
http.Error(w, "base prices must be >= 0", http.StatusBadRequest)
return
}
}
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Minute)
defer cancel()
ctx = utils.ContextWithTraceID(ctx, traceID)
logger := utils.SlogFromContext(ctx).With("handler", "product-pricing.save", "trace_id", traceID, "user", claims.Username, "user_id", claims.ID)
mssql := db.GetDB()
if mssql == nil {
http.Error(w, "mssql not connected", http.StatusInternalServerError)
return
}
pgTx, err := pg.BeginTx(ctx, nil)
if err != nil {
http.Error(w, "pg transaction start error", http.StatusInternalServerError)
return
}
defer pgTx.Rollback()
msTx, err := mssql.BeginTx(ctx, nil)
if err != nil {
http.Error(w, "mssql transaction start error", http.StatusInternalServerError)
return
}
defer msTx.Rollback()
// Serialize writes to pricing tables in PG to avoid contention with other pricing jobs.
if _, err := pgTx.ExecContext(ctx, `SELECT pg_advisory_xact_lock(2001, 1)`); err != nil {
http.Error(w, "pg advisory lock error", http.StatusInternalServerError)
return
}
savedPG := 0
savedMSSQL := 0
missingPG := 0
missingMSSQL := 0
// Load mapping tables once.
pgMap := map[string]map[int]int{} // currency -> level -> sdprcgrp_id
nebimMap := map[string]map[int]string{} // currency -> level -> price_group_code
{
rows, err := pgTx.QueryContext(ctx, `
SELECT currency, level_no, COALESCE(sdprcgrp_id, 0)
FROM mk_price_target_map_pg
WHERE is_active = TRUE
`)
if err == nil {
for rows.Next() {
var cur string
var level int
var grp int
if err := rows.Scan(&cur, &level, &grp); err != nil {
_ = rows.Close()
http.Error(w, "pg map scan error", http.StatusInternalServerError)
return
}
cur = strings.ToUpper(strings.TrimSpace(cur))
if cur == "" || level <= 0 || level > 6 || grp <= 0 {
continue
}
// In this setup sdprcgrp_id is expected to be 1..6. Guard against stale/invalid mappings.
if grp < 1 || grp > 6 {
continue
}
if pgMap[cur] == nil {
pgMap[cur] = map[int]int{}
}
pgMap[cur][level] = grp
}
_ = rows.Close()
}
}
{
rows, err := pgTx.QueryContext(ctx, `
SELECT currency, level_no, COALESCE(NULLIF(BTRIM(price_group_code), ''), '')
FROM mk_price_target_map_nebim
WHERE is_active = TRUE
`)
if err == nil {
for rows.Next() {
var cur string
var level int
var code string
if err := rows.Scan(&cur, &level, &code); err != nil {
_ = rows.Close()
http.Error(w, "nebim map scan error", http.StatusInternalServerError)
return
}
cur = strings.ToUpper(strings.TrimSpace(cur))
code = strings.TrimSpace(code)
if cur == "" || level <= 0 || level > 6 || code == "" {
continue
}
if nebimMap[cur] == nil {
nebimMap[cur] = map[int]string{}
}
nebimMap[cur][level] = code
}
_ = rows.Close()
}
}
changed := make(map[string]struct{}, len(payload.Items))
// In-request cache to avoid repeating expensive dim resolution work.
// Key: "<column>|<TOKEN>" where token is uppercased/trimmed.
dimTokenLocalCache := make(map[string]int64, 256)
type dimCombo struct {
Dim1 int64
Dim3 sql.NullInt64
}
type sdprcWriteRow struct {
Currency string `json:"currency"`
SdprcGrpID int `json:"sdprcgrp_id"`
Dim1 int64 `json:"dim1"`
Dim3 *int64 `json:"dim3"`
Price float64 `json:"price"`
}
// sdprc has a unique constraint on the business key (tier/currency/dims).
// If input rows contain duplicates for the same key (can happen due to tier/group mapping),
// we must dedupe before bulk inserting to avoid 500s like "duplicate key violates uq_sdprc_2".
dedupeSdprcWriteRows := func(productCode string, in []sdprcWriteRow) []sdprcWriteRow {
if len(in) <= 1 {
return in
}
type key struct {
Cur string
Grp int
D1 int64
D3 int64 // 0 => NULL
}
idx := make(map[key]int, len(in))
out := make([]sdprcWriteRow, 0, len(in))
for _, r := range in {
cur := strings.ToUpper(strings.TrimSpace(r.Currency))
d3k := int64(0)
if r.Dim3 != nil && *r.Dim3 > 0 {
d3k = *r.Dim3
}
k := key{Cur: cur, Grp: r.SdprcGrpID, D1: r.Dim1, D3: d3k}
if i, ok := idx[k]; ok {
out[i] = r // keep last
continue
}
idx[k] = len(out)
out = append(out, r)
}
if len(out) != len(in) {
logger.Warn("save:pg:sdprc:dedupe",
"product_code", strings.TrimSpace(productCode),
"in_rows", len(in),
"out_rows", len(out),
)
}
return out
}
parseDimID := func(s string) (int64, bool) {
s = strings.TrimSpace(s)
if s == "" {
return 0, false
}
// tolerate leading zeros like "001"
s2 := strings.TrimLeft(s, "0")
if s2 == "" {
s2 = "0"
}
n, err := strconv.ParseInt(s2, 10, 64)
if err != nil || n <= 0 {
return 0, false
}
return n, true
}
type queryRower interface {
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}
resolveDimvalFromToken := func(q queryRower, column, token string) (int64, bool) {
token = strings.ToUpper(normalizeDimParam(token))
if token == "" {
return 0, false
}
cacheKey := column + "|" + token
if v, ok := dimTokenLocalCache[cacheKey]; ok {
return v, v > 0
}
// Fast path: persistent token->id mapping table.
{
var id int64
if err := pgTx.QueryRowContext(ctx, `
SELECT dim_id
FROM mk_dim_token_map
WHERE dim_column = $1 AND token = $2
`, column, token).Scan(&id); err == nil && id > 0 {
dimTokenLocalCache[cacheKey] = id
return id, true
}
}
patterns := buildNameLikePatterns(token)
if len(patterns) == 0 {
dimTokenLocalCache[cacheKey] = 0
return 0, false
}
query := fmt.Sprintf(`
SELECT x.dimv
FROM (
SELECT COALESCE(%s::text, '') AS dimv, COUNT(*) AS cnt
FROM dfblob
WHERE src_table='mmitem'
AND typ='img'
AND COALESCE(%s::text, '') <> ''
AND (
UPPER(COALESCE(file_name,'')) LIKE $1 OR
UPPER(COALESCE(file_name,'')) LIKE $2 OR
UPPER(COALESCE(file_name,'')) LIKE $3 OR
UPPER(COALESCE(file_name,'')) LIKE $4 OR
UPPER(COALESCE(file_name,'')) LIKE $5 OR
UPPER(COALESCE(file_name,'')) LIKE $6
)
GROUP BY COALESCE(%s::text, '')
) x
ORDER BY x.cnt DESC, x.dimv
LIMIT 1
`, column, column, column)
var v string
if err := q.QueryRowContext(ctx,
query,
patterns[0],
patterns[1],
patterns[2],
patterns[3],
patterns[4],
patterns[5],
).Scan(&v); err != nil {
dimTokenLocalCache[cacheKey] = 0
return 0, false
}
v = normalizeDimParam(v)
if v == "" {
dimTokenLocalCache[cacheKey] = 0
return 0, false
}
id, ok := parseDimID(v)
if !ok {
dimTokenLocalCache[cacheKey] = 0
return 0, false
}
// Persist for future requests (best-effort).
_, _ = pgTx.ExecContext(ctx, `
INSERT INTO mk_dim_token_map (dim_column, token, dim_id, updated_at)
VALUES ($1,$2,$3,now())
ON CONFLICT (dim_column, token)
DO UPDATE SET dim_id = EXCLUDED.dim_id, updated_at = EXCLUDED.updated_at
`, column, token, id)
dimTokenLocalCache[cacheKey] = id
return id, true
}
loadDimsFromMssqlStock := func(productCode string) ([]dimCombo, error) {
started := time.Now()
if db.MssqlDB == nil {
return nil, fmt.Errorf("mssql not ready")
}
rows, err := db.MssqlDB.QueryContext(ctx, queries.GetProductVariantDimsForPricing, productCode)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]dimCombo, 0, 32)
seen := make(map[string]struct{}, 64)
readRows := 0
resolvedDim1 := 0
resolvedDim3 := 0
for rows.Next() {
readRows++
var colorCode, dim1Code, dim3Code string
if err := rows.Scan(&colorCode, &dim1Code, &dim3Code); err != nil {
return nil, err
}
// Resolve to PG dim ids. For this installation we align with mmitem_dim key:
// - dim1 = color
// - dim3 = itemdim3 (optional)
// Size (ItemDim1Code) is not part of the key here.
d1 := int64(0)
if id, ok := resolveDimvalFromToken(pgTx, "dimval1", colorCode); ok {
d1 = id
resolvedDim1++
} else if id, ok := resolveDimvalFromToken(pgTx, "dimval1", dim3Code); ok {
d1 = id
resolvedDim1++
} else if id, ok := resolveDimvalFromToken(pgTx, "dimval1", dim1Code); ok {
d1 = id
resolvedDim1++
}
if d1 <= 0 {
continue
}
var d3 sql.NullInt64
// dim3 corresponds to mmitem_dim.val3 (ItemDim3Code).
if id, ok := resolveDimvalFromToken(pgTx, "dimval1", dim3Code); ok {
d3 = sql.NullInt64{Int64: id, Valid: true}
resolvedDim3++
}
key := fmt.Sprintf("%d|%d", d1, func() int64 {
if d3.Valid {
return d3.Int64
}
return 0
}())
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
out = append(out, dimCombo{Dim1: d1, Dim3: d3})
}
if err := rows.Err(); err != nil {
return nil, err
}
logger.Info("save:pg:dims:mssql:resolved",
"product_code", strings.TrimSpace(productCode),
"rows_read", readRows,
"dims", len(out),
"resolved_dim1", resolvedDim1,
"resolved_dim3", resolvedDim3,
"duration_ms", time.Since(started).Milliseconds(),
)
return out, nil
}
ensureMMItemDimRows := func(mmItemID int64, combos []dimCombo, extraVal3 map[string]int64) {
if mmItemID <= 0 || len(combos) == 0 {
return
}
// Best-effort: don't assume a specific unique constraint exists on mmitem_dim.
for _, c := range combos {
if c.Dim1 <= 0 {
continue
}
var v2any any = nil
// Active key: val1=color, val3=itemdim3. val2 is size and is not part of price/campaign key.
v3 := int64(0)
if c.Dim3.Valid && c.Dim3.Int64 > 0 {
v3 = c.Dim3.Int64
} else if extraVal3 != nil {
if vv, ok := extraVal3[fmt.Sprintf("%d|0", c.Dim1)]; ok && vv > 0 {
v3 = vv
}
}
mmdimID := int64(2)
var v3any any = nil
if v3 > 0 {
mmdimID = 3
v3any = v3
}
_, _ = pgTx.ExecContext(ctx, `
INSERT INTO mmitem_dim (mmitem_id, mmdim_id, val1, val2, val3, is_active, qty)
SELECT $1, $2, $3, $4, $5, TRUE, 0
WHERE NOT EXISTS (
SELECT 1
FROM mmitem_dim
WHERE mmitem_id = $1
AND mmdim_id = $2
AND val1 = $3
AND COALESCE(val2, 0) = COALESCE($4::bigint, 0)
AND COALESCE(val3, 0) = COALESCE($5::bigint, 0)
LIMIT 1
);
`, mmItemID, mmdimID, c.Dim1, v2any, v3any)
}
}
loadDimsFromPgMMItemDim := func(mmItemID int64, productCode string) ([]dimCombo, error) {
started := time.Now()
if mmItemID <= 0 {
return nil, fmt.Errorf("invalid mmitem_id")
}
rows, err := pgTx.QueryContext(ctx, `
SELECT mmdim_id, val1, val2, val3
FROM mmitem_dim
WHERE mmitem_id = $1
AND COALESCE(is_active, TRUE) = TRUE
`, mmItemID)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]dimCombo, 0, 64)
seen := make(map[string]struct{}, 128)
readRows := 0
for rows.Next() {
readRows++
var mmdimID sql.NullInt64
var v1 sql.NullInt64
var v2 sql.NullInt64
var v3 sql.NullInt64
if err := rows.Scan(&mmdimID, &v1, &v2, &v3); err != nil {
return nil, err
}
if !v1.Valid || v1.Int64 <= 0 {
continue
}
// Variant key in this installation: (val1=color, val3=itemdim3_if_any). Ignore val2 (size).
d1 := v1.Int64
_ = mmdimID
_ = v2
var d3 sql.NullInt64
if v3.Valid && v3.Int64 > 0 {
d3 = sql.NullInt64{Int64: v3.Int64, Valid: true}
}
key := fmt.Sprintf("%d|%d", d1, func() int64 {
if d3.Valid {
return d3.Int64
}
return 0
}())
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
out = append(out, dimCombo{Dim1: d1, Dim3: d3})
}
if err := rows.Err(); err != nil {
return nil, err
}
logger.Info("save:pg:dims:mmitem-dim:loaded",
"product_code", strings.TrimSpace(productCode),
"mmitem_id", mmItemID,
"rows_read", readRows,
"dims", len(out),
"duration_ms", time.Since(started).Milliseconds(),
)
return out, nil
}
upsertDimCombosCache := func(productCode string, dims []dimCombo) error {
productCode = strings.TrimSpace(productCode)
if productCode == "" || len(dims) == 0 {
return nil
}
for _, d := range dims {
_, err := pgTx.ExecContext(ctx, `
INSERT INTO mk_mmitem_dim_combo (product_code, dim1, dim3, updated_at)
VALUES ($1,$2,$3,now())
ON CONFLICT (product_code, dim1, dim3_key)
DO UPDATE SET updated_at = EXCLUDED.updated_at
`, productCode, d.Dim1, func() any {
if d.Dim3.Valid {
return d.Dim3.Int64
}
return nil
}())
if err != nil {
return err
}
}
return nil
}
bulkAppendOnlyInsertSdprc := func(mmItemID int64, productCode string, rows []sdprcWriteRow) (int, error) {
if mmItemID <= 0 {
return 0, fmt.Errorf("invalid mmitem_id")
}
if len(rows) == 0 {
return 0, nil
}
raw, err := json.Marshal(rows)
if err != nil {
return 0, err
}
q := `
WITH input AS (
SELECT *
FROM jsonb_to_recordset($1::jsonb) AS x(currency text, sdprcgrp_id int, dim1 bigint, dim3 bigint, price float8)
),
norm AS (
SELECT
UPPER(NULLIF(BTRIM(currency), '')) AS currency,
COALESCE(sdprcgrp_id, 0) AS sdprcgrp_id,
COALESCE(dim1, 0) AS dim1,
dim3 AS dim3,
COALESCE(price, 0) AS price
FROM input
),
filtered AS (
SELECT *
FROM norm
WHERE currency IN ('USD','EUR','TRY')
AND sdprcgrp_id BETWEEN 1 AND 6
AND dim1 > 0
AND price > 0
),
grouped AS (
-- Ensure one row per business key to avoid unique violations under strict constraints.
SELECT
sdprcgrp_id,
currency AS crn,
dim1,
dim3,
MAX(price) AS prc
FROM filtered
GROUP BY sdprcgrp_id, currency, dim1, dim3
),
updated AS (
UPDATE sdprc s
SET prc = g.prc,
zlins_dttm = now()
FROM grouped g
WHERE s.mmitem_id = $2::bigint
AND s.sdprcgrp_id = g.sdprcgrp_id
AND s.crn = g.crn
AND s.dim1 = g.dim1
AND COALESCE(s.dim3, 0) = COALESCE(g.dim3, 0)
AND s.prc IS DISTINCT FROM g.prc
RETURNING 1
),
inserted AS (
INSERT INTO sdprc (mmitem_id, sdprcgrp_id, crn, dim1, dim3, prc, zlins_dttm)
SELECT $2::bigint, g.sdprcgrp_id, g.crn, g.dim1, g.dim3, g.prc, now()
FROM grouped g
WHERE NOT EXISTS (
SELECT 1
FROM sdprc s
WHERE s.mmitem_id = $2::bigint
AND s.sdprcgrp_id = g.sdprcgrp_id
AND s.crn = g.crn
AND s.dim1 = g.dim1
AND COALESCE(s.dim3, 0) = COALESCE(g.dim3, 0)
)
RETURNING 1
)
SELECT ((SELECT COUNT(*) FROM updated) + (SELECT COUNT(*) FROM inserted))::int;
`
var inserted int
if err := pgTx.QueryRowContext(ctx, q, raw, mmItemID).Scan(&inserted); err != nil {
return 0, err
}
if inserted > 0 {
savedPG += inserted
changed[productCode] = struct{}{}
}
return inserted, nil
}
// MSSQL memoization: reduce chatter for large batches.
// header id cache key: "<CUR>|<PRICEGROUP>"
msHeaderIDCache := make(map[string]string, 64)
// next sort cache key: "<HEADERID>"
msHeaderNextSort := make(map[string]int64, 64)
type msLatestKey struct {
Cur string
PriceGroup string
}
loadLatestPricesForProduct := func(productCode string, pairs []msLatestKey) (map[string]float64, map[string]bool) {
out := make(map[string]float64, len(pairs))
ok := make(map[string]bool, len(pairs))
productCode = strings.TrimSpace(productCode)
if productCode == "" || len(pairs) == 0 {
return out, ok
}
conds := make([]string, 0, len(pairs))
args := []any{sql.Named("ItemCode", productCode)}
for i, p := range pairs {
pg := strings.TrimSpace(p.PriceGroup)
cur := strings.ToUpper(strings.TrimSpace(p.Cur))
if pg == "" || (cur != "USD" && cur != "EUR" && cur != "TRY") {
continue
}
args = append(args,
sql.Named(fmt.Sprintf("pg%d", i), pg),
sql.Named(fmt.Sprintf("cur%d", i), cur),
)
conds = append(conds,
fmt.Sprintf("(LTRIM(RTRIM(PriceGroupCode)) = @pg%d AND LTRIM(RTRIM(DocCurrencyCode)) = @cur%d)", i, i),
)
}
if len(conds) == 0 {
return out, ok
}
q := fmt.Sprintf(`
SELECT PriceGroupCode, DocCurrencyCode, Price
FROM (
SELECT
LTRIM(RTRIM(PriceGroupCode)) AS PriceGroupCode,
LTRIM(RTRIM(DocCurrencyCode)) AS DocCurrencyCode,
CAST(Price AS FLOAT) AS Price,
ROW_NUMBER() OVER (
PARTITION BY LTRIM(RTRIM(PriceGroupCode)), LTRIM(RTRIM(DocCurrencyCode))
ORDER BY ValidDate DESC, ValidTime DESC, LastUpdatedDate DESC
) AS rn
FROM dbo.trPriceListLine WITH(NOLOCK)
WHERE ItemTypeCode = 1
AND LTRIM(RTRIM(ItemCode)) = @ItemCode
AND ISNULL(IsDisabled, 0) = 0
AND (%s)
) x
WHERE rn = 1;
`, strings.Join(conds, " OR "))
rows, err := msTx.QueryContext(ctx, q, args...)
if err != nil {
logger.Warn("save:mssql:latest:prefetch:error", "product_code", productCode, "err", err)
return out, ok
}
defer rows.Close()
for rows.Next() {
var pg, cur string
var price float64
if err := rows.Scan(&pg, &cur, &price); err != nil {
logger.Warn("save:mssql:latest:prefetch:scan:error", "product_code", productCode, "err", err)
return out, ok
}
pg = strings.TrimSpace(pg)
cur = strings.ToUpper(strings.TrimSpace(cur))
k := cur + "|" + pg
out[k] = price
ok[k] = true
}
return out, ok
}
// Helper: append-only Nebim price list line (insert new row when price changes).
// Resolve PriceListHeaderID from trPriceListHeader (source of truth).
// If header does not exist for the given PriceGroupCode+Currency, create it, then insert lines under that header.
upsertPriceListLine := func(productCode string, currency string, priceGroup string, price float64, latest map[string]float64, latestOK map[string]bool) (bool, error) {
currency = strings.ToUpper(strings.TrimSpace(currency))
priceGroup = strings.TrimSpace(priceGroup)
if price <= 0 {
return false, nil
}
if currency != "USD" && currency != "EUR" && currency != "TRY" {
return false, fmt.Errorf("invalid currency")
}
if priceGroup == "" {
return false, fmt.Errorf("empty price group")
}
// Resolve or create header id for that group/currency (memoized).
headerKey := currency + "|" + priceGroup
headerID := strings.TrimSpace(msHeaderIDCache[headerKey])
if headerID == "" {
var err error
headerID, err = resolveOrCreatePriceListHeaderID(ctx, msTx, priceGroup, currency, claims.Username, logger)
if err != nil {
return false, err
}
msHeaderIDCache[headerKey] = headerID
}
// If latest line already has the same price, no-op (prefer prefetch map).
if latest != nil && latestOK != nil && latestOK[headerKey] {
if curLatest, ok := latest[headerKey]; ok && math.Abs(curLatest-price) < 1e-9 {
return false, nil
}
} else {
// Fallback: query latest for this key if not prefetched.
var latestPrice sql.NullFloat64
_ = msTx.QueryRowContext(ctx, `
SELECT TOP (1) CAST(Price AS FLOAT)
FROM dbo.trPriceListLine WITH(NOLOCK)
WHERE ItemTypeCode = 1
AND LTRIM(RTRIM(ItemCode)) = @p1
AND LTRIM(RTRIM(DocCurrencyCode)) = @p2
AND LTRIM(RTRIM(PriceGroupCode)) = @p3
AND ISNULL(IsDisabled, 0) = 0
ORDER BY ValidDate DESC, ValidTime DESC, LastUpdatedDate DESC;
`, sql.Named("p1", productCode), sql.Named("p2", currency), sql.Named("p3", priceGroup)).Scan(&latestPrice)
if latestPrice.Valid && math.Abs(latestPrice.Float64-price) < 1e-9 {
return false, nil
}
}
// SortOrder: append inside header.
nextSort := msHeaderNextSort[headerID]
if nextSort <= 0 {
_ = msTx.QueryRowContext(ctx, `
SELECT ISNULL(MAX(SortOrder), 0) + 1
FROM dbo.trPriceListLine WITH(NOLOCK)
WHERE PriceListHeaderID = CONVERT(UNIQUEIDENTIFIER, @p1);
`, sql.Named("p1", headerID)).Scan(&nextSort)
if nextSort <= 0 {
nextSort = 1
}
}
msHeaderNextSort[headerID] = nextSort + 1
// Insert minimal line.
_, err := msTx.ExecContext(ctx, `
INSERT INTO dbo.trPriceListLine (
PriceListLineID,
SortOrder,
ItemTypeCode,
ItemCode,
ColorCode,
ItemDim1Code,
ItemDim2Code,
ItemDim3Code,
UnitOfMeasureCode,
PaymentPlanCode,
LineDescription,
DocCurrencyCode,
Price,
IsDisabled,
DisableDate,
CompanyCode,
PriceGroupCode,
ValidDate,
ValidTime,
PriceListHeaderID,
CreatedUserName,
CreatedDate,
LastUpdatedUserName,
LastUpdatedDate
)
VALUES (
NEWID(),
@SortOrder,
1,
@ItemCode,
'',
'',
'',
'',
'AD',
'',
'',
@Currency,
@Price,
0,
'1900-01-01',
1,
@PriceGroupCode,
CONVERT(date, GETDATE()),
'00:00:00',
CONVERT(uniqueidentifier, @HeaderID),
@UserName,
GETDATE(),
@UserName,
GETDATE()
);
`, sql.Named("SortOrder", nextSort),
sql.Named("ItemCode", productCode),
sql.Named("Currency", currency),
sql.Named("Price", price),
sql.Named("PriceGroupCode", priceGroup),
sql.Named("HeaderID", headerID),
sql.Named("UserName", claims.Username),
)
if err != nil {
return false, err
}
return true, nil
}
for _, it := range payload.Items {
code := strings.TrimSpace(it.ProductCode)
if code == "" {
continue
}
var latestMap map[string]float64
var latestOK map[string]bool
var mmItemID int64
if err := pgTx.QueryRowContext(ctx, `SELECT id FROM mmitem WHERE code = $1`, code).Scan(&mmItemID); err != nil {
// If missing in PG, we can still save MSSQL tiers; PG write will be skipped.
mmItemID = 0
}
dims := []dimCombo{}
// Prefer PG's own authoritative dim combo table (mmitem_dim). Cache is only a fast-path fallback.
if mmItemID > 0 {
// 1) Authoritative source: mmitem_dim (PG).
if d, err := loadDimsFromPgMMItemDim(mmItemID, code); err == nil && len(d) > 0 {
dims = d
_ = upsertDimCombosCache(code, dims) // best-effort cache fill
}
// 2) Merge MSSQL variant master combos. PG may be partially populated; missing
// colors/dim3 combos still need to be seeded before sdprc/zbggcampaign writes.
if d, err := loadDimsFromMssqlStock(code); err != nil {
logger.Error("save:pg:dims:mssql:error", "product_code", code, "err", err)
} else if len(d) > 0 {
seenDims := make(map[string]struct{}, len(dims)+len(d))
merged := make([]dimCombo, 0, len(dims)+len(d))
dim3Value := func(v sql.NullInt64) int64 {
if v.Valid {
return v.Int64
}
return 0
}
for _, c := range dims {
k := fmt.Sprintf("%d|%d", c.Dim1, dim3Value(c.Dim3))
if _, ok := seenDims[k]; ok {
continue
}
seenDims[k] = struct{}{}
merged = append(merged, c)
}
for _, c := range d {
k := fmt.Sprintf("%d|%d", c.Dim1, dim3Value(c.Dim3))
if _, ok := seenDims[k]; ok {
continue
}
seenDims[k] = struct{}{}
merged = append(merged, c)
}
dims = merged
_ = upsertDimCombosCache(code, dims)
ensureMMItemDimRows(mmItemID, d, nil)
}
}
// Tier prices in PG sdprc + Nebim price list lines (mapped).
type tier struct {
Cur string
Level int
Price float64
}
tiers := []tier{
{"USD", 1, it.USD1}, {"USD", 2, it.USD2}, {"USD", 3, it.USD3}, {"USD", 4, it.USD4}, {"USD", 5, it.USD5}, {"USD", 6, it.USD6},
{"EUR", 1, it.EUR1}, {"EUR", 2, it.EUR2}, {"EUR", 3, it.EUR3}, {"EUR", 4, it.EUR4}, {"EUR", 5, it.EUR5}, {"EUR", 6, it.EUR6},
{"TRY", 1, it.TRY1}, {"TRY", 2, it.TRY2}, {"TRY", 3, it.TRY3}, {"TRY", 4, it.TRY4}, {"TRY", 5, it.TRY5}, {"TRY", 6, it.TRY6},
}
// Prefetch MSSQL latest prices for all relevant pairs for this product.
// This turns N tier "TOP 1" lookups into a single query per product.
{
msPairs := make([]msLatestKey, 0, 24)
seen := make(map[string]struct{}, 32)
addPair := func(cur, pg string) {
cur = strings.ToUpper(strings.TrimSpace(cur))
pg = strings.TrimSpace(pg)
if pg == "" {
return
}
k := cur + "|" + pg
if _, ok := seen[k]; ok {
return
}
seen[k] = struct{}{}
msPairs = append(msPairs, msLatestKey{Cur: cur, PriceGroup: pg})
}
if it.BasePriceUsd > 0 {
addPair("USD", "TM-USD")
}
if it.BasePriceTry > 0 {
addPair("TRY", "TM-TRY")
}
for _, t := range tiers {
if t.Price <= 0 {
continue
}
nebimGrp := ""
if nebimMap[t.Cur] != nil {
nebimGrp = nebimMap[t.Cur][t.Level]
}
if nebimGrp == "" {
continue
}
addPair(t.Cur, nebimGrp)
}
latestMap, latestOK = loadLatestPricesForProduct(code, msPairs)
}
// Base prices in Nebim price lists.
{
ch, err := upsertPriceListLine(code, "USD", "TM-USD", it.BasePriceUsd, latestMap, latestOK)
if err != nil {
logger.Error("save:mssql:base-usd:error", "product_code", code, "err", err)
http.Error(w, "mssql base price save error: "+err.Error(), http.StatusInternalServerError)
return
}
if ch {
changed[code] = struct{}{}
savedMSSQL++
}
ch, err = upsertPriceListLine(code, "TRY", "TM-TRY", it.BasePriceTry, latestMap, latestOK)
if err != nil {
logger.Error("save:mssql:base-try:error", "product_code", code, "err", err)
http.Error(w, "mssql base price save error: "+err.Error(), http.StatusInternalServerError)
return
}
if ch {
changed[code] = struct{}{}
savedMSSQL++
}
}
// PG write: bulk append-only insert across dims (fast).
if mmItemID > 0 && len(dims) > 0 {
writeRows := make([]sdprcWriteRow, 0, len(dims)*len(tiers))
for _, t := range tiers {
if t.Price <= 0 {
continue
}
pgGrp := 0
if pgMap[t.Cur] != nil {
pgGrp = pgMap[t.Cur][t.Level]
}
if pgGrp <= 0 {
pgGrp = t.Level
}
for _, dc := range dims {
var d3 *int64
if dc.Dim3.Valid {
v := dc.Dim3.Int64
d3 = &v
}
writeRows = append(writeRows, sdprcWriteRow{
Currency: t.Cur,
SdprcGrpID: pgGrp,
Dim1: dc.Dim1,
Dim3: d3,
Price: t.Price,
})
}
}
if len(writeRows) > 0 {
writeRows = dedupeSdprcWriteRows(code, writeRows)
startPG := time.Now()
inserted, err := bulkAppendOnlyInsertSdprc(mmItemID, code, writeRows)
if err != nil {
logger.Error("save:pg:sdprc:bulk:error", "product_code", code, "dims", len(dims), "rows", len(writeRows), "err", err)
http.Error(w, "postgres tier save error: "+err.Error(), http.StatusInternalServerError)
return
}
logger.Info("save:pg:sdprc:bulk:ok", "product_code", code, "dims", len(dims), "rows", len(writeRows), "inserted", inserted, "duration_ms", time.Since(startPG).Milliseconds())
}
} else {
for _, t := range tiers {
if t.Price > 0 {
missingPG++
logger.Warn("save:pg:sdprc:skip:no-dims", "product_code", code, "currency", t.Cur, "level", t.Level)
}
}
}
// MSSQL tier writes (mapped).
for _, t := range tiers {
nebimGrp := ""
if nebimMap[t.Cur] != nil {
nebimGrp = nebimMap[t.Cur][t.Level]
}
if nebimGrp == "" {
if t.Price > 0 {
missingMSSQL++
}
continue
}
msChanged, err := upsertPriceListLine(code, t.Cur, nebimGrp, t.Price, latestMap, latestOK)
if err != nil {
logger.Error("save:mssql:tier:error", "product_code", code, "currency", t.Cur, "level", t.Level, "price_group", nebimGrp, "err", err)
http.Error(w, "mssql tier save error: "+err.Error(), http.StatusInternalServerError)
return
}
if msChanged {
changed[code] = struct{}{}
savedMSSQL++
}
}
}
// Delta queue: only products with an explicit price change record should be processed by delta jobs.
{
codes := make([]string, 0, len(changed))
for c := range changed {
codes = append(codes, c)
}
if _, err := queries.EnqueuePriceRecalc(ctx, pgTx, codes, "manual_price_save"); err != nil {
logger.Error("save:enqueue:error", "err", err)
http.Error(w, "price recalc enqueue error: "+err.Error(), http.StatusInternalServerError)
return
}
}
if err := msTx.Commit(); err != nil {
logger.Error("save:mssql:commit:error", "err", err)
http.Error(w, "mssql commit error", http.StatusInternalServerError)
return
}
if err := pgTx.Commit(); err != nil {
logger.Error("save:pg:commit:error", "err", err)
http.Error(w, "postgres commit error", http.StatusInternalServerError)
return
}
// Post-commit pricing mail: only for actually changed products.
if ml != nil && len(changed) > 0 {
changedCodes := make([]string, 0, len(changed))
for c := range changed {
changedCodes = append(changedCodes, c)
}
actor := claims.Username
go sendPricingChangeMails(context.Background(), ml, changedCodes, actor)
}
// Immediate FX delta publish kick (best-effort): run right away for changed products.
// Queue entries are still created for reliability; on success we mark them done to avoid a second pass.
if len(changed) > 0 {
changedCodes := make([]string, 0, len(changed))
for c := range changed {
changedCodes = append(changedCodes, c)
}
go func(codes []string) {
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel2()
written, fxDateYmd, err := queries.PublishDerivedPricesFromAnchor(ctx2, pg, codes, "", false)
if err != nil {
log.Printf("[PricingFxImmediate] publish_error codes=%d err=%v", len(codes), err)
return
}
tx2, err := pg.BeginTx(ctx2, nil)
if err == nil {
_, _ = queries.MarkPriceRecalcQueueDoneByProductCodes(ctx2, tx2, codes)
_ = tx2.Commit()
}
log.Printf("[PricingFxImmediate] ok codes=%d sdprc_written=%d fx_date_ymd=%d", len(codes), written, fxDateYmd)
}(changedCodes)
}
logger.Info("save:done",
"items", len(payload.Items),
"saved_pg", savedPG,
"saved_mssql", savedMSSQL,
"missing_pg", missingPG,
"missing_mssql", missingMSSQL,
"duration_ms", time.Since(started).Milliseconds(),
)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
_ = json.NewEncoder(w).Encode(map[string]any{
"success": true,
"saved_pg": savedPG,
"saved_mssql": savedMSSQL,
"missing_pg": missingPG,
"missing_mssql": missingMSSQL,
})
}
}