From ee642e5eaabf3dec66a641143e8666bc6ab58d4a Mon Sep 17 00:00:00 2001 From: fontvielle Date: Thu, 14 May 2026 13:43:49 +0300 Subject: [PATCH] =?UTF-8?q?feat(m2mcore):=20PgRepository=20=D1=87=D0=B5?= =?UTF-8?q?=D1=80=D0=B5=D0=B7=20pgx=20+=20=D0=B8=D0=BD=D1=82=D0=B5=D0=B3?= =?UTF-8?q?=D1=80=D0=B0=D1=86=D0=B8=D1=8F=20=D0=B2=20lk-gateway?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PostgreSQL-репозиторий для m2m_core.deals — реальное хранилище сделок вместо in-memory. Выбор Repository происходит автоматически в lkgateway.NewServer: если в runtime-конфиге задан Postgres DSN, поднимается pgxpool и используется PostgresRepository; иначе fallback на MemoryRepository. internal/m2mcore/pgrepo.go: - PostgresRepository: Create (идемпотентный по guid через ON CONFLICT DO NOTHING), GetByGUID, GetByID, Update, List (с фильтрами state/investor/created_*), AppendEvent для журнала deal_events - request_xml/response_xml/decision_xml хранятся как windows-1251 XML через nsdxml, на чтении парсятся обратно в m2m.M2M* структуры - stages — jsonb с историей FSM-переходов migrations/m2m-core/002__stages.sql: - ALTER TABLE deals ADD COLUMN stages jsonb DEFAULT '[]' internal/lkgateway/server.go: - При NewServer проверяется runtime-config: если есть DSN → PostgresRepository, иначе MemoryRepository; ошибка подключения логируется с fallback на in-memory - Тесты используют tempdir SetupPath для изоляции от реальной БД internal/lkgateway/setup.go: - tryPingPostgres переписан с database/sql (требует регистрации драйвера) на pgx.Connect — теперь форма /admin/setup/postgres реально проверяет подключение перед сохранением DSN Проверено сквозным smoke-тестом: введение DSN через UI → сохранение в ~/.bj/setup.json → перезапуск lk-gateway → лог "PostgresRepository подключён (m2m_core.deals)" → сделки реально пишутся в БД. Co-Authored-By: Claude Opus 4.7 (1M context) --- deploy/docker-compose/docker-compose.yml | 4 +- go.mod | 15 +- go.sum | 26 ++ internal/lkgateway/server.go | 32 ++- internal/lkgateway/server_test.go | 3 + internal/lkgateway/setup.go | 22 +- internal/m2mcore/pgrepo.go | 287 +++++++++++++++++++++++ migrations/m2m-core/002__stages.sql | 9 + 8 files changed, 373 insertions(+), 25 deletions(-) create mode 100644 internal/m2mcore/pgrepo.go create mode 100644 migrations/m2m-core/002__stages.sql diff --git a/deploy/docker-compose/docker-compose.yml b/deploy/docker-compose/docker-compose.yml index 0697f73..1c20d1f 100644 --- a/deploy/docker-compose/docker-compose.yml +++ b/deploy/docker-compose/docker-compose.yml @@ -7,7 +7,7 @@ version: "3.9" services: postgres: - image: postgres:16 + image: docker.io/library/postgres:16 # В проде заменить на postgrespro/std-16 или registry.postgrespro.ru/pgpro/... container_name: bj-postgres environment: @@ -20,7 +20,7 @@ services: - bj-postgres-data:/var/lib/postgresql/data minio: - image: minio/minio:latest + image: docker.io/minio/minio:latest container_name: bj-minio command: server /data --console-address ":9001" environment: diff --git a/go.mod b/go.mod index b22a59a..e81377b 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,16 @@ module git.zetit.ru/zuevav/Bridge-and-Join-s -go 1.23 +go 1.24.0 -require golang.org/x/text v0.22.0 +require ( + github.com/jackc/pgx/v5 v5.7.4 + golang.org/x/text v0.22.0 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/crypto v0.35.0 // indirect + golang.org/x/sync v0.17.0 // indirect +) diff --git a/go.sum b/go.sum index a81672a..16df3cb 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,28 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.4 h1:9wKznZrhWa2QiHL+NjTSPP6yjl3451BX3imWDnokYlg= +github.com/jackc/pgx/v5 v5.7.4/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= +golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/lkgateway/server.go b/internal/lkgateway/server.go index 9ad3b9d..c283b49 100644 --- a/internal/lkgateway/server.go +++ b/internal/lkgateway/server.go @@ -33,8 +33,11 @@ type Server struct { server *http.Server } -// NewServer собирает Server с in-memory репозиторием, mock NSDSender, -// SeedStore и REST + Admin маршрутами. +// NewServer собирает Server с репозиторием, mock NSDSender, SeedStore +// и REST + Admin маршрутами. Выбор Repository: +// - если в runtime-конфиге (или ENV-fallback в cfg) задан PostgresDSN +// и pgx-Pool успешно создаётся — используется PostgresRepository; +// - иначе fallback на MemoryRepository (M2-демо). func NewServer(cfg ServerConfig) (*Server, error) { store := NewSeedStore() mockCfg := mock.DefaultConfig() @@ -44,8 +47,27 @@ func NewServer(cfg ServerConfig) (*Server, error) { } sender := mock.NewSender(mockCfg) + rc, err := NewRuntimeConfig(cfg.SetupPath) + if err != nil { + return nil, err + } + + // Repository: pgx если DSN указан, иначе in-memory. + var repo m2mcore.Repository = m2mcore.NewMemoryRepository() + if dsn := rc.Snapshot().Postgres.DSN; dsn != "" { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + pgRepo, pgErr := m2mcore.NewPostgresRepository(ctx, dsn) + cancel() + if pgErr != nil { + log.Printf("lk-gateway: PostgresRepository отказал, fallback MemoryRepository: %v", pgErr) + } else { + repo = pgRepo + log.Printf("lk-gateway: PostgresRepository подключён (m2m_core.deals)") + } + } + svc := NewService(Config{ - Repository: m2mcore.NewMemoryRepository(), + Repository: repo, Sender: sender, Store: store, Recorder: m2mcore.NewMemoryRecorder(), @@ -53,10 +75,6 @@ func NewServer(cfg ServerConfig) (*Server, error) { DefaultReceiver: cfg.DefaultReceiver, }) - rc, err := NewRuntimeConfig(cfg.SetupPath) - if err != nil { - return nil, err - } // Если runtime-конфиг уже содержит callback URL — применяем его. if s := rc.Snapshot(); s.LK.CallbackURL != "" { svc.callbackURL = s.LK.CallbackURL diff --git a/internal/lkgateway/server_test.go b/internal/lkgateway/server_test.go index 912b0d4..1c059ac 100644 --- a/internal/lkgateway/server_test.go +++ b/internal/lkgateway/server_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "path/filepath" "strings" "testing" "time" @@ -20,6 +21,8 @@ func newServer(t *testing.T) *lkgateway.Server { DefaultSender: "MC0079200000", DefaultReceiver: "MC0010300000", MockDecisionDelay: 50 * time.Millisecond, + // Изоляция от ~/.bj/setup.json — каждый тест получает пустой файл. + SetupPath: filepath.Join(t.TempDir(), "setup.json"), CheckOptions: func() lkgateway.CheckOptions { return lkgateway.CheckOptions{Profile: "test", CryptoProvider: "stub"} }, diff --git a/internal/lkgateway/setup.go b/internal/lkgateway/setup.go index 5f5ce8d..9e7b3cd 100644 --- a/internal/lkgateway/setup.go +++ b/internal/lkgateway/setup.go @@ -2,7 +2,6 @@ package lkgateway import ( "context" - "database/sql" "errors" "fmt" "net/http" @@ -10,6 +9,8 @@ import ( "os" "strings" "time" + + "github.com/jackc/pgx/v5" ) // setupHandlers — обработчики /admin/setup/*. @@ -278,23 +279,16 @@ func (h *setupHandlers) runTestClaim() { _ = h.rc.RecordTestRun(res) } -// tryPingPostgres пытается sql.Open + Ping с прокачкой драйвера; без -// драйвера вернёт «unknown driver pgx»/«unknown driver postgres» — -// тоже считаем ошибкой и показываем пользователю. +// tryPingPostgres делает короткое подключение через pgx и Ping. func tryPingPostgres(dsn string) error { - // Угадываем имя драйвера по префиксу. - driver := "postgres" - if strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") { - driver = "postgres" - } - db, err := sql.Open(driver, dsn) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + conn, err := pgx.Connect(ctx, dsn) if err != nil { return err } - defer db.Close() - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - return db.PingContext(ctx) + defer conn.Close(ctx) + return conn.Ping(ctx) } // tryHTTPHealth делает GET и ждёт 2xx. diff --git a/internal/m2mcore/pgrepo.go b/internal/m2mcore/pgrepo.go new file mode 100644 index 0000000..1d71b74 --- /dev/null +++ b/internal/m2mcore/pgrepo.go @@ -0,0 +1,287 @@ +package m2mcore + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "git.zetit.ru/zuevav/Bridge-and-Join-s/internal/m2m" + "git.zetit.ru/zuevav/Bridge-and-Join-s/internal/nsdxml" +) + +// PostgresRepository — реализация Repository через jackc/pgx. +// Использует таблицы m2m_core.deals и m2m_core.deal_events +// (см. migrations/m2m-core/001__deals.sql и 002__stages.sql). +type PostgresRepository struct { + pool *pgxpool.Pool +} + +// NewPostgresRepository подключается по DSN (postgres://user:pass@host:port/db). +func NewPostgresRepository(ctx context.Context, dsn string) (*PostgresRepository, error) { + if dsn == "" { + return nil, errors.New("m2mcore: PostgresRepository: пустой DSN") + } + cfg, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, fmt.Errorf("pgxpool.ParseConfig: %w", err) + } + cfg.MaxConns = 10 + cfg.MaxConnLifetime = 30 * time.Minute + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("pgxpool.NewWithConfig: %w", err) + } + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("pool.Ping: %w", err) + } + return &PostgresRepository{pool: pool}, nil +} + +// Close закрывает пул. +func (r *PostgresRepository) Close() { + if r.pool != nil { + r.pool.Close() + } +} + +// Create вставляет сделку идемпотентно по guid. Если запись с таким +// guid уже есть, возвращает её (без модификации). +func (r *PostgresRepository) Create(ctx context.Context, deal *Deal) (*Deal, error) { + reqXML, _ := marshalXMLIfPresent(deal.Request) + respXML, _ := marshalXMLIfPresent(deal.Response) + decisionXML, _ := marshalXMLIfPresent(deal.Decision) + stages, err := json.Marshal(deal.Stages) + if err != nil { + return nil, err + } + + const q = ` +INSERT INTO m2m_core.deals (id, guid, state, investor_id, signed_claim, + request_xml, response_xml, decision_xml, + stages, created_at, updated_at) +VALUES ($1, $2, $3, NULLIF($4, '')::uuid, $5, $6, $7, $8, $9, $10, $11) +ON CONFLICT (guid) DO NOTHING +RETURNING id` + var id string + err = r.pool.QueryRow(ctx, q, + deal.ID, string(deal.GUID), string(deal.State), deal.InvestorID, + deal.SignedClaim, reqXML, respXML, decisionXML, + stages, deal.CreatedAt, deal.UpdatedAt, + ).Scan(&id) + if errors.Is(err, pgx.ErrNoRows) { + // уже существовала — вернём существующую + return r.GetByGUID(ctx, deal.GUID) + } + if err != nil { + return nil, fmt.Errorf("PostgresRepository.Create: %w", err) + } + return deal, nil +} + +// GetByGUID находит сделку по M2M GUID. +func (r *PostgresRepository) GetByGUID(ctx context.Context, guid m2m.UUID) (*Deal, error) { + return r.scanOne(ctx, `WHERE guid = $1`, string(guid)) +} + +// GetByID находит сделку по внутреннему UUID. +func (r *PostgresRepository) GetByID(ctx context.Context, id string) (*Deal, error) { + return r.scanOne(ctx, `WHERE id = $1`, id) +} + +// Update сохраняет полное состояние сделки (для простоты — без diff). +func (r *PostgresRepository) Update(ctx context.Context, deal *Deal) error { + reqXML, _ := marshalXMLIfPresent(deal.Request) + respXML, _ := marshalXMLIfPresent(deal.Response) + decisionXML, _ := marshalXMLIfPresent(deal.Decision) + stages, err := json.Marshal(deal.Stages) + if err != nil { + return err + } + const q = ` +UPDATE m2m_core.deals + SET state = $2, request_xml = $3, response_xml = $4, decision_xml = $5, + stages = $6, updated_at = $7 + WHERE id = $1` + tag, err := r.pool.Exec(ctx, q, + deal.ID, string(deal.State), reqXML, respXML, decisionXML, stages, deal.UpdatedAt, + ) + if err != nil { + return fmt.Errorf("PostgresRepository.Update: %w", err) + } + if tag.RowsAffected() == 0 { + return ErrNotFound + } + return nil +} + +// List возвращает сделки по фильтру. +func (r *PostgresRepository) List(ctx context.Context, f Filter) ([]*Deal, error) { + args := []any{} + where := "" + add := func(cond string, val any) { + args = append(args, val) + if where == "" { + where = " WHERE " + } else { + where += " AND " + } + where += fmt.Sprintf(cond, len(args)) + } + if f.State != nil { + add(`state = $%d`, string(*f.State)) + } + if f.InvestorID != "" { + add(`investor_id = $%d::uuid`, f.InvestorID) + } + if f.CreatedFrom != nil { + add(`created_at >= $%d`, *f.CreatedFrom) + } + if f.CreatedTo != nil { + add(`created_at <= $%d`, *f.CreatedTo) + } + limit := f.Limit + if limit <= 0 { + limit = 50 + } + offset := f.Offset + q := dealsSelectSQL() + where + fmt.Sprintf(" ORDER BY created_at DESC LIMIT %d OFFSET %d", limit, offset) + rows, err := r.pool.Query(ctx, q, args...) + if err != nil { + return nil, fmt.Errorf("PostgresRepository.List: %w", err) + } + defer rows.Close() + + out := make([]*Deal, 0) + for rows.Next() { + d, err := scanRow(rows) + if err != nil { + return nil, err + } + out = append(out, d) + } + return out, rows.Err() +} + +// AppendEvent добавляет аудит-событие сделки. +func (r *PostgresRepository) AppendEvent(ctx context.Context, dealID string, ev Event) error { + payload, _ := json.Marshal(ev.Payload) + const q = ` +INSERT INTO m2m_core.deal_events (deal_id, type, payload, actor, created_at) +VALUES ($1::uuid, $2, $3, $4, $5)` + _, err := r.pool.Exec(ctx, q, dealID, ev.Type, payload, ev.Actor, ev.CreatedAt) + if err != nil { + return fmt.Errorf("PostgresRepository.AppendEvent: %w", err) + } + return nil +} + +// scanOne возвращает одну запись по where-условию. +func (r *PostgresRepository) scanOne(ctx context.Context, where string, args ...any) (*Deal, error) { + q := dealsSelectSQL() + " " + where + row := r.pool.QueryRow(ctx, q, args...) + d, err := scanRow(row) + if errors.Is(err, pgx.ErrNoRows) { + return nil, ErrNotFound + } + return d, err +} + +// rowScanner объединяет pgx.Row и pgx.Rows для общего scanRow. +type rowScanner interface { + Scan(dest ...any) error +} + +// scanRow читает одну строку deals в *Deal. +func scanRow(r rowScanner) (*Deal, error) { + var ( + id, guid, state string + investorID *string + signedClaim []byte + reqXML, respXML, decisionX []byte + stages []byte + createdAt, updatedAt time.Time + ) + if err := r.Scan(&id, &guid, &state, &investorID, &signedClaim, + &reqXML, &respXML, &decisionX, &stages, &createdAt, &updatedAt); err != nil { + return nil, err + } + d := &Deal{ + ID: id, + GUID: m2m.UUID(guid), + State: State(state), + SignedClaim: signedClaim, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + } + if investorID != nil { + d.InvestorID = *investorID + } + if len(reqXML) > 0 { + var v m2m.M2MTransferRequest + if err := nsdxml.Unmarshal(reqXML, &v); err == nil { + d.Request = &v + } + } + if len(respXML) > 0 { + var v m2m.M2MTransferResponse + if err := nsdxml.Unmarshal(respXML, &v); err == nil { + d.Response = &v + } + } + if len(decisionX) > 0 { + var v m2m.M2MTransferDecision + if err := nsdxml.Unmarshal(decisionX, &v); err == nil { + d.Decision = &v + } + } + if len(stages) > 0 { + _ = json.Unmarshal(stages, &d.Stages) + } + return d, nil +} + +// dealsSelectSQL — SELECT с фиксированным порядком колонок. +func dealsSelectSQL() string { + return `SELECT id, guid, state, investor_id, signed_claim, + request_xml, response_xml, decision_xml, + stages, created_at, updated_at + FROM m2m_core.deals` +} + +// marshalXMLIfPresent сериализует *T в windows-1251 XML (или возвращает nil). +func marshalXMLIfPresent(v any) ([]byte, error) { + if v == nil { + return nil, nil + } + // Через reflect не используем — у nsdxml.Marshal вход — interface{}. + // Здесь же поступает *m2m.M2M... — проверим nil-ness через type-switch. + switch x := v.(type) { + case *m2m.M2MTransferRequest: + if x == nil { + return nil, nil + } + return nsdxml.Marshal(x) + case *m2m.M2MTransferResponse: + if x == nil { + return nil, nil + } + return nsdxml.Marshal(x) + case *m2m.M2MTransferDecision: + if x == nil { + return nil, nil + } + return nsdxml.Marshal(x) + } + return nil, nil +} + +// _ assertions +var ( + _ Repository = (*PostgresRepository)(nil) +) diff --git a/migrations/m2m-core/002__stages.sql b/migrations/m2m-core/002__stages.sql new file mode 100644 index 0000000..6f6a41a --- /dev/null +++ b/migrations/m2m-core/002__stages.sql @@ -0,0 +1,9 @@ +-- 002__stages.sql +-- Добавляем jsonb-колонку для истории FSM-переходов сделки. +-- Используется и для аудита, и для отрисовки на admin-странице. + +ALTER TABLE m2m_core.deals + ADD COLUMN IF NOT EXISTS stages jsonb NOT NULL DEFAULT '[]'::jsonb; + +COMMENT ON COLUMN m2m_core.deals.stages IS + 'История FSM-переходов: [{state, entered_at, left_at, reason}, ...]';