Merge remote-tracking branch 'origin/master'
This commit is contained in:
@@ -13,25 +13,29 @@ import (
|
|||||||
|
|
||||||
var PgDB *sql.DB
|
var PgDB *sql.DB
|
||||||
|
|
||||||
// ConnectPostgres PostgreSQL veritabanına bağlanır.
|
// ConnectPostgres PostgreSQL veritabanına bağlanır.
|
||||||
func ConnectPostgres() (*sql.DB, error) {
|
func ConnectPostgres() (*sql.DB, error) {
|
||||||
connStr := strings.TrimSpace(os.Getenv("POSTGRES_CONN"))
|
connStr := strings.TrimSpace(os.Getenv("POSTGRES_CONN"))
|
||||||
if connStr == "" {
|
if connStr == "" {
|
||||||
return nil, fmt.Errorf("POSTGRES_CONN tanımlı değil")
|
return nil, fmt.Errorf("POSTGRES_CONN tanımlı değil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make DB writes attributable in Postgres audit triggers via current_setting('application_name').
|
||||||
|
// Prefer setting it in POSTGRES_CONN, but default it here if missing.
|
||||||
|
connStr = ensurePGApplicationName(connStr, strings.TrimSpace(os.Getenv("POSTGRES_APPLICATION_NAME")))
|
||||||
|
|
||||||
db, err := sql.Open("postgres", connStr)
|
db, err := sql.Open("postgres", connStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("PostgreSQL bağlantı hatası: %w", err)
|
return nil, fmt.Errorf("PostgreSQL bağlantı hatası: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bağlantı havuzu ayarları (audit log uyumlu).
|
// Bağlantı havuzu ayarları (audit log uyumlu).
|
||||||
db.SetMaxOpenConns(30)
|
db.SetMaxOpenConns(30)
|
||||||
db.SetMaxIdleConns(10)
|
db.SetMaxIdleConns(10)
|
||||||
db.SetConnMaxLifetime(30 * time.Minute)
|
db.SetConnMaxLifetime(30 * time.Minute)
|
||||||
db.SetConnMaxIdleTime(5 * time.Minute)
|
db.SetConnMaxIdleTime(5 * time.Minute)
|
||||||
|
|
||||||
// Bağlantıyı test et.
|
// Bağlantıyı test et.
|
||||||
if err = db.Ping(); err != nil {
|
if err = db.Ping(); err != nil {
|
||||||
// Some managed PostgreSQL servers require TLS. If the current DSN uses
|
// Some managed PostgreSQL servers require TLS. If the current DSN uses
|
||||||
// sslmode=disable and server rejects with "no encryption", retry once
|
// sslmode=disable and server rejects with "no encryption", retry once
|
||||||
@@ -40,6 +44,7 @@ func ConnectPostgres() (*sql.DB, error) {
|
|||||||
strings.Contains(err.Error(), "no encryption") &&
|
strings.Contains(err.Error(), "no encryption") &&
|
||||||
strings.Contains(strings.ToLower(connStr), "sslmode=disable") {
|
strings.Contains(strings.ToLower(connStr), "sslmode=disable") {
|
||||||
secureConnStr := strings.Replace(connStr, "sslmode=disable", "sslmode=require", 1)
|
secureConnStr := strings.Replace(connStr, "sslmode=disable", "sslmode=require", 1)
|
||||||
|
secureConnStr = ensurePGApplicationName(secureConnStr, strings.TrimSpace(os.Getenv("POSTGRES_APPLICATION_NAME")))
|
||||||
log.Println("PostgreSQL TLS gerektiriyor, sslmode=require ile tekrar deneniyor")
|
log.Println("PostgreSQL TLS gerektiriyor, sslmode=require ile tekrar deneniyor")
|
||||||
|
|
||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
@@ -54,28 +59,50 @@ func ConnectPostgres() (*sql.DB, error) {
|
|||||||
db.SetConnMaxIdleTime(5 * time.Minute)
|
db.SetConnMaxIdleTime(5 * time.Minute)
|
||||||
|
|
||||||
if err = db.Ping(); err != nil {
|
if err = db.Ping(); err != nil {
|
||||||
return nil, fmt.Errorf("PostgreSQL erişilemiyor (TLS retry): %w", err)
|
return nil, fmt.Errorf("PostgreSQL eriÅŸilemiyor (TLS retry): %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return nil, fmt.Errorf("PostgreSQL erişilemiyor: %w", err)
|
return nil, fmt.Errorf("PostgreSQL eriÅŸilemiyor: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("PostgreSQL bağlantısı başarılı")
|
log.Println("PostgreSQL bağlantısı başarılı")
|
||||||
PgDB = db
|
PgDB = db
|
||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPostgresUsers test amaçlı ilk 5 kullanıcıyı listeler.
|
func ensurePGApplicationName(connStr string, appName string) string {
|
||||||
|
if strings.TrimSpace(appName) == "" {
|
||||||
|
appName = "bssapp-backend"
|
||||||
|
}
|
||||||
|
low := strings.ToLower(connStr)
|
||||||
|
if strings.Contains(low, "application_name=") {
|
||||||
|
return connStr
|
||||||
|
}
|
||||||
|
|
||||||
|
// URL style DSN: postgres://...?... (lib/pq supports it)
|
||||||
|
if strings.HasPrefix(low, "postgres://") || strings.HasPrefix(low, "postgresql://") {
|
||||||
|
sep := "?"
|
||||||
|
if strings.Contains(connStr, "?") {
|
||||||
|
sep = "&"
|
||||||
|
}
|
||||||
|
return connStr + sep + "application_name=" + appName
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keyword DSN: "host=... user=... dbname=... sslmode=..."
|
||||||
|
return strings.TrimSpace(connStr) + " application_name=" + appName
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPostgresUsers test amaçlı ilk 5 kullanıcıyı listeler.
|
||||||
func GetPostgresUsers(db *sql.DB) error {
|
func GetPostgresUsers(db *sql.DB) error {
|
||||||
query := `SELECT id, code, email FROM mk_dfusr ORDER BY id LIMIT 5`
|
query := `SELECT id, code, email FROM mk_dfusr ORDER BY id LIMIT 5`
|
||||||
rows, err := db.Query(query)
|
rows, err := db.Query(query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("PostgreSQL sorgu hatası: %w", err)
|
return fmt.Errorf("PostgreSQL sorgu hatası: %w", err)
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
fmt.Println("İlk 5 PostgreSQL kullanıcısı:")
|
fmt.Println("İlk 5 PostgreSQL kullanıcısı:")
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var id int
|
var id int
|
||||||
var code, email string
|
var code, email string
|
||||||
|
|||||||
@@ -533,6 +533,7 @@ func productSeriesApplyVariant(ctx context.Context, pg *sql.DB, v productSeriesA
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 1, err
|
return 0, 1, err
|
||||||
}
|
}
|
||||||
|
appliedFallback := false
|
||||||
if !ready {
|
if !ready {
|
||||||
if productSeriesFallbackLogEnabled() {
|
if productSeriesFallbackLogEnabled() {
|
||||||
log.Printf("[ProductSeriesFallback] skip_not_ready product=%s color=%s dim3=%s", strings.TrimSpace(v.ProductCode), strings.TrimSpace(v.ColorCode), strings.TrimSpace(v.Dim3Code))
|
log.Printf("[ProductSeriesFallback] skip_not_ready product=%s color=%s dim3=%s", strings.TrimSpace(v.ProductCode), strings.TrimSpace(v.ColorCode), strings.TrimSpace(v.Dim3Code))
|
||||||
@@ -607,6 +608,30 @@ func productSeriesApplyVariant(ctx context.Context, pg *sql.DB, v productSeriesA
|
|||||||
return 0, 1, err
|
return 0, 1, err
|
||||||
}
|
}
|
||||||
if len(existingIDs) > 0 {
|
if len(existingIDs) > 0 {
|
||||||
|
// Optional grace window: if we recently applied non-fallback rules for this variant,
|
||||||
|
// don't immediately flap back to fallback due to a transient/partial stock snapshot.
|
||||||
|
graceMin := envIntRange("PRODUCT_SERIES_FALLBACK_GRACE_MINUTES", 2, 0, 60)
|
||||||
|
if graceMin > 0 {
|
||||||
|
rowKey := productSeriesAutoKey(v.ProductCode, v.ColorCode, v.Dim3Code)
|
||||||
|
kind, at, err := productSeriesLoadLastApplyState(ctx, pg, rowKey)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 1, err
|
||||||
|
}
|
||||||
|
if kind == "rules" && time.Since(at) < time.Duration(graceMin)*time.Minute {
|
||||||
|
// keep existing mapping as-is for now
|
||||||
|
if productSeriesFallbackLogEnabled() {
|
||||||
|
log.Printf("[ProductSeriesFallback] grace_skip product=%s color=%s dim3=%s grace_min=%d last_rules_at=%s",
|
||||||
|
strings.TrimSpace(v.ProductCode),
|
||||||
|
strings.TrimSpace(v.ColorCode),
|
||||||
|
strings.TrimSpace(v.Dim3Code),
|
||||||
|
graceMin,
|
||||||
|
at.Format(time.RFC3339),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return 0, 0, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
anyStillValid := false
|
anyStillValid := false
|
||||||
for _, sid := range existingIDs {
|
for _, sid := range existingIDs {
|
||||||
// If we can't find the rule definition in memory, assume it's a manual/unknown mapping and keep it.
|
// If we can't find the rule definition in memory, assume it's a manual/unknown mapping and keep it.
|
||||||
@@ -641,6 +666,7 @@ func productSeriesApplyVariant(ctx context.Context, pg *sql.DB, v productSeriesA
|
|||||||
}
|
}
|
||||||
// Use the fallback series as the single selected rule.
|
// Use the fallback series as the single selected rule.
|
||||||
selected = []productSeriesAutoRule{{SeriesID: fallbackID}}
|
selected = []productSeriesAutoRule{{SeriesID: fallbackID}}
|
||||||
|
appliedFallback = true
|
||||||
} else {
|
} else {
|
||||||
if productSeriesFallbackLogEnabled() {
|
if productSeriesFallbackLogEnabled() {
|
||||||
log.Printf("[ProductSeriesFallback] missing_fallback product=%s color=%s dim3=%s fallback_code=%s", strings.TrimSpace(v.ProductCode), strings.TrimSpace(v.ColorCode), strings.TrimSpace(v.Dim3Code), strings.TrimSpace(fallbackCode))
|
log.Printf("[ProductSeriesFallback] missing_fallback product=%s color=%s dim3=%s fallback_code=%s", strings.TrimSpace(v.ProductCode), strings.TrimSpace(v.ColorCode), strings.TrimSpace(v.Dim3Code), strings.TrimSpace(fallbackCode))
|
||||||
@@ -676,11 +702,27 @@ VALUES ($1, $2, $3, $4)
|
|||||||
if err := tx.Commit(); err != nil {
|
if err := tx.Commit(); err != nil {
|
||||||
return 0, 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
|
// Track what we last applied for this variant to reduce flapping and help ops debugging.
|
||||||
|
rowKey := productSeriesAutoKey(v.ProductCode, v.ColorCode, v.Dim3Code)
|
||||||
|
kind := "rules"
|
||||||
|
if appliedFallback {
|
||||||
|
kind = "fallback"
|
||||||
|
}
|
||||||
|
ids := make([]string, 0, len(selected))
|
||||||
|
for _, r := range selected {
|
||||||
|
ids = append(ids, fmt.Sprintf("%d", r.SeriesID))
|
||||||
|
}
|
||||||
|
_, _ = pg.ExecContext(ctx, `
|
||||||
|
INSERT INTO mk_product_series_apply_state (row_key, last_apply_at, last_apply_kind, last_series_ids, updated_at)
|
||||||
|
VALUES ($1, now(), $2, $3, now())
|
||||||
|
ON CONFLICT (row_key) DO UPDATE
|
||||||
|
SET last_apply_at=EXCLUDED.last_apply_at,
|
||||||
|
last_apply_kind=EXCLUDED.last_apply_kind,
|
||||||
|
last_series_ids=EXCLUDED.last_series_ids,
|
||||||
|
updated_at=now()
|
||||||
|
`, rowKey, kind, strings.Join(ids, ","))
|
||||||
|
|
||||||
if productSeriesDebugMatch(v) {
|
if productSeriesDebugMatch(v) {
|
||||||
ids := make([]string, 0, len(selected))
|
|
||||||
for _, r := range selected {
|
|
||||||
ids = append(ids, fmt.Sprintf("%d", r.SeriesID))
|
|
||||||
}
|
|
||||||
log.Printf("[ProductSeriesDebug] wrote=%d product=%s color=%s dim3=%s series_ids=%s",
|
log.Printf("[ProductSeriesDebug] wrote=%d product=%s color=%s dim3=%s series_ids=%s",
|
||||||
len(selected),
|
len(selected),
|
||||||
strings.TrimSpace(v.ProductCode),
|
strings.TrimSpace(v.ProductCode),
|
||||||
@@ -692,6 +734,23 @@ VALUES ($1, $2, $3, $4)
|
|||||||
return len(selected), 0, nil
|
return len(selected), 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func productSeriesLoadLastApplyState(ctx context.Context, pg *sql.DB, rowKey string) (kind string, at time.Time, err error) {
|
||||||
|
var k string
|
||||||
|
var t time.Time
|
||||||
|
err = pg.QueryRowContext(ctx, `
|
||||||
|
SELECT COALESCE(last_apply_kind,''), COALESCE(last_apply_at, now())
|
||||||
|
FROM mk_product_series_apply_state
|
||||||
|
WHERE row_key=$1
|
||||||
|
`, rowKey).Scan(&k, &t)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return "", time.Time{}, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return "", time.Time{}, err
|
||||||
|
}
|
||||||
|
return k, t, nil
|
||||||
|
}
|
||||||
|
|
||||||
func productSeriesFindRuleByID(rules []productSeriesAutoRule, seriesID int64) (productSeriesAutoRule, bool) {
|
func productSeriesFindRuleByID(rules []productSeriesAutoRule, seriesID int64) (productSeriesAutoRule, bool) {
|
||||||
for _, r := range rules {
|
for _, r := range rules {
|
||||||
if r.SeriesID == seriesID {
|
if r.SeriesID == seriesID {
|
||||||
|
|||||||
@@ -70,6 +70,16 @@ CREATE TABLE IF NOT EXISTS mk_product_series_job_log (
|
|||||||
error_text TEXT NOT NULL DEFAULT ''
|
error_text TEXT NOT NULL DEFAULT ''
|
||||||
)`,
|
)`,
|
||||||
`CREATE INDEX IF NOT EXISTS ix_mk_product_series_job_log_started ON mk_product_series_job_log (started_at DESC)`,
|
`CREATE INDEX IF NOT EXISTS ix_mk_product_series_job_log_started ON mk_product_series_job_log (started_at DESC)`,
|
||||||
|
`
|
||||||
|
CREATE TABLE IF NOT EXISTS mk_product_series_apply_state (
|
||||||
|
row_key TEXT PRIMARY KEY,
|
||||||
|
last_apply_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
|
last_apply_kind TEXT NOT NULL DEFAULT '',
|
||||||
|
last_series_ids TEXT NOT NULL DEFAULT '',
|
||||||
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
|
CONSTRAINT ck_mk_product_series_apply_state_kind CHECK (last_apply_kind IN ('', 'rules', 'fallback'))
|
||||||
|
)`,
|
||||||
|
`CREATE INDEX IF NOT EXISTS ix_mk_product_series_apply_state_updated ON mk_product_series_apply_state (updated_at DESC)`,
|
||||||
}
|
}
|
||||||
for _, stmt := range stmts {
|
for _, stmt := range stmts {
|
||||||
if _, err := pg.Exec(stmt); err != nil {
|
if _, err := pg.Exec(stmt); err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user