feat(m2mcore): PgRepository через pgx + интеграция в lk-gateway
PostgreSQL-репозиторий для m2m_core.deals — реальное хранилище сделок вместо in-memory. Выбор Repository происходит автоматически в lkgateway.NewServer: если в runtime-конфиге задан Postgres DSN, поднимается pgxpool и используется PostgresRepository; иначе fallback на MemoryRepository. internal/m2mcore/pgrepo.go: - PostgresRepository: Create (идемпотентный по guid через ON CONFLICT DO NOTHING), GetByGUID, GetByID, Update, List (с фильтрами state/investor/created_*), AppendEvent для журнала deal_events - request_xml/response_xml/decision_xml хранятся как windows-1251 XML через nsdxml, на чтении парсятся обратно в m2m.M2M* структуры - stages — jsonb с историей FSM-переходов migrations/m2m-core/002__stages.sql: - ALTER TABLE deals ADD COLUMN stages jsonb DEFAULT '[]' internal/lkgateway/server.go: - При NewServer проверяется runtime-config: если есть DSN → PostgresRepository, иначе MemoryRepository; ошибка подключения логируется с fallback на in-memory - Тесты используют tempdir SetupPath для изоляции от реальной БД internal/lkgateway/setup.go: - tryPingPostgres переписан с database/sql (требует регистрации драйвера) на pgx.Connect — теперь форма /admin/setup/postgres реально проверяет подключение перед сохранением DSN Проверено сквозным smoke-тестом: введение DSN через UI → сохранение в ~/.bj/setup.json → перезапуск lk-gateway → лог "PostgresRepository подключён (m2m_core.deals)" → сделки реально пишутся в БД. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,287 @@
|
||||
package m2mcore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"git.zetit.ru/zuevav/Bridge-and-Join-s/internal/m2m"
|
||||
"git.zetit.ru/zuevav/Bridge-and-Join-s/internal/nsdxml"
|
||||
)
|
||||
|
||||
// PostgresRepository — реализация Repository через jackc/pgx.
|
||||
// Использует таблицы m2m_core.deals и m2m_core.deal_events
|
||||
// (см. migrations/m2m-core/001__deals.sql и 002__stages.sql).
|
||||
type PostgresRepository struct {
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
// NewPostgresRepository подключается по DSN (postgres://user:pass@host:port/db).
|
||||
func NewPostgresRepository(ctx context.Context, dsn string) (*PostgresRepository, error) {
|
||||
if dsn == "" {
|
||||
return nil, errors.New("m2mcore: PostgresRepository: пустой DSN")
|
||||
}
|
||||
cfg, err := pgxpool.ParseConfig(dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pgxpool.ParseConfig: %w", err)
|
||||
}
|
||||
cfg.MaxConns = 10
|
||||
cfg.MaxConnLifetime = 30 * time.Minute
|
||||
pool, err := pgxpool.NewWithConfig(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pgxpool.NewWithConfig: %w", err)
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
return nil, fmt.Errorf("pool.Ping: %w", err)
|
||||
}
|
||||
return &PostgresRepository{pool: pool}, nil
|
||||
}
|
||||
|
||||
// Close закрывает пул.
|
||||
func (r *PostgresRepository) Close() {
|
||||
if r.pool != nil {
|
||||
r.pool.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Create вставляет сделку идемпотентно по guid. Если запись с таким
|
||||
// guid уже есть, возвращает её (без модификации).
|
||||
func (r *PostgresRepository) Create(ctx context.Context, deal *Deal) (*Deal, error) {
|
||||
reqXML, _ := marshalXMLIfPresent(deal.Request)
|
||||
respXML, _ := marshalXMLIfPresent(deal.Response)
|
||||
decisionXML, _ := marshalXMLIfPresent(deal.Decision)
|
||||
stages, err := json.Marshal(deal.Stages)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
const q = `
|
||||
INSERT INTO m2m_core.deals (id, guid, state, investor_id, signed_claim,
|
||||
request_xml, response_xml, decision_xml,
|
||||
stages, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, NULLIF($4, '')::uuid, $5, $6, $7, $8, $9, $10, $11)
|
||||
ON CONFLICT (guid) DO NOTHING
|
||||
RETURNING id`
|
||||
var id string
|
||||
err = r.pool.QueryRow(ctx, q,
|
||||
deal.ID, string(deal.GUID), string(deal.State), deal.InvestorID,
|
||||
deal.SignedClaim, reqXML, respXML, decisionXML,
|
||||
stages, deal.CreatedAt, deal.UpdatedAt,
|
||||
).Scan(&id)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
// уже существовала — вернём существующую
|
||||
return r.GetByGUID(ctx, deal.GUID)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PostgresRepository.Create: %w", err)
|
||||
}
|
||||
return deal, nil
|
||||
}
|
||||
|
||||
// GetByGUID находит сделку по M2M GUID.
|
||||
func (r *PostgresRepository) GetByGUID(ctx context.Context, guid m2m.UUID) (*Deal, error) {
|
||||
return r.scanOne(ctx, `WHERE guid = $1`, string(guid))
|
||||
}
|
||||
|
||||
// GetByID находит сделку по внутреннему UUID.
|
||||
func (r *PostgresRepository) GetByID(ctx context.Context, id string) (*Deal, error) {
|
||||
return r.scanOne(ctx, `WHERE id = $1`, id)
|
||||
}
|
||||
|
||||
// Update сохраняет полное состояние сделки (для простоты — без diff).
|
||||
func (r *PostgresRepository) Update(ctx context.Context, deal *Deal) error {
|
||||
reqXML, _ := marshalXMLIfPresent(deal.Request)
|
||||
respXML, _ := marshalXMLIfPresent(deal.Response)
|
||||
decisionXML, _ := marshalXMLIfPresent(deal.Decision)
|
||||
stages, err := json.Marshal(deal.Stages)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
const q = `
|
||||
UPDATE m2m_core.deals
|
||||
SET state = $2, request_xml = $3, response_xml = $4, decision_xml = $5,
|
||||
stages = $6, updated_at = $7
|
||||
WHERE id = $1`
|
||||
tag, err := r.pool.Exec(ctx, q,
|
||||
deal.ID, string(deal.State), reqXML, respXML, decisionXML, stages, deal.UpdatedAt,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("PostgresRepository.Update: %w", err)
|
||||
}
|
||||
if tag.RowsAffected() == 0 {
|
||||
return ErrNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List возвращает сделки по фильтру.
|
||||
func (r *PostgresRepository) List(ctx context.Context, f Filter) ([]*Deal, error) {
|
||||
args := []any{}
|
||||
where := ""
|
||||
add := func(cond string, val any) {
|
||||
args = append(args, val)
|
||||
if where == "" {
|
||||
where = " WHERE "
|
||||
} else {
|
||||
where += " AND "
|
||||
}
|
||||
where += fmt.Sprintf(cond, len(args))
|
||||
}
|
||||
if f.State != nil {
|
||||
add(`state = $%d`, string(*f.State))
|
||||
}
|
||||
if f.InvestorID != "" {
|
||||
add(`investor_id = $%d::uuid`, f.InvestorID)
|
||||
}
|
||||
if f.CreatedFrom != nil {
|
||||
add(`created_at >= $%d`, *f.CreatedFrom)
|
||||
}
|
||||
if f.CreatedTo != nil {
|
||||
add(`created_at <= $%d`, *f.CreatedTo)
|
||||
}
|
||||
limit := f.Limit
|
||||
if limit <= 0 {
|
||||
limit = 50
|
||||
}
|
||||
offset := f.Offset
|
||||
q := dealsSelectSQL() + where + fmt.Sprintf(" ORDER BY created_at DESC LIMIT %d OFFSET %d", limit, offset)
|
||||
rows, err := r.pool.Query(ctx, q, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PostgresRepository.List: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make([]*Deal, 0)
|
||||
for rows.Next() {
|
||||
d, err := scanRow(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, d)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// AppendEvent добавляет аудит-событие сделки.
|
||||
func (r *PostgresRepository) AppendEvent(ctx context.Context, dealID string, ev Event) error {
|
||||
payload, _ := json.Marshal(ev.Payload)
|
||||
const q = `
|
||||
INSERT INTO m2m_core.deal_events (deal_id, type, payload, actor, created_at)
|
||||
VALUES ($1::uuid, $2, $3, $4, $5)`
|
||||
_, err := r.pool.Exec(ctx, q, dealID, ev.Type, payload, ev.Actor, ev.CreatedAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("PostgresRepository.AppendEvent: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// scanOne возвращает одну запись по where-условию.
|
||||
func (r *PostgresRepository) scanOne(ctx context.Context, where string, args ...any) (*Deal, error) {
|
||||
q := dealsSelectSQL() + " " + where
|
||||
row := r.pool.QueryRow(ctx, q, args...)
|
||||
d, err := scanRow(row)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return d, err
|
||||
}
|
||||
|
||||
// rowScanner объединяет pgx.Row и pgx.Rows для общего scanRow.
|
||||
type rowScanner interface {
|
||||
Scan(dest ...any) error
|
||||
}
|
||||
|
||||
// scanRow читает одну строку deals в *Deal.
|
||||
func scanRow(r rowScanner) (*Deal, error) {
|
||||
var (
|
||||
id, guid, state string
|
||||
investorID *string
|
||||
signedClaim []byte
|
||||
reqXML, respXML, decisionX []byte
|
||||
stages []byte
|
||||
createdAt, updatedAt time.Time
|
||||
)
|
||||
if err := r.Scan(&id, &guid, &state, &investorID, &signedClaim,
|
||||
&reqXML, &respXML, &decisionX, &stages, &createdAt, &updatedAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d := &Deal{
|
||||
ID: id,
|
||||
GUID: m2m.UUID(guid),
|
||||
State: State(state),
|
||||
SignedClaim: signedClaim,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: updatedAt,
|
||||
}
|
||||
if investorID != nil {
|
||||
d.InvestorID = *investorID
|
||||
}
|
||||
if len(reqXML) > 0 {
|
||||
var v m2m.M2MTransferRequest
|
||||
if err := nsdxml.Unmarshal(reqXML, &v); err == nil {
|
||||
d.Request = &v
|
||||
}
|
||||
}
|
||||
if len(respXML) > 0 {
|
||||
var v m2m.M2MTransferResponse
|
||||
if err := nsdxml.Unmarshal(respXML, &v); err == nil {
|
||||
d.Response = &v
|
||||
}
|
||||
}
|
||||
if len(decisionX) > 0 {
|
||||
var v m2m.M2MTransferDecision
|
||||
if err := nsdxml.Unmarshal(decisionX, &v); err == nil {
|
||||
d.Decision = &v
|
||||
}
|
||||
}
|
||||
if len(stages) > 0 {
|
||||
_ = json.Unmarshal(stages, &d.Stages)
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// dealsSelectSQL — SELECT с фиксированным порядком колонок.
|
||||
func dealsSelectSQL() string {
|
||||
return `SELECT id, guid, state, investor_id, signed_claim,
|
||||
request_xml, response_xml, decision_xml,
|
||||
stages, created_at, updated_at
|
||||
FROM m2m_core.deals`
|
||||
}
|
||||
|
||||
// marshalXMLIfPresent сериализует *T в windows-1251 XML (или возвращает nil).
|
||||
func marshalXMLIfPresent(v any) ([]byte, error) {
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
// Через reflect не используем — у nsdxml.Marshal вход — interface{}.
|
||||
// Здесь же поступает *m2m.M2M... — проверим nil-ness через type-switch.
|
||||
switch x := v.(type) {
|
||||
case *m2m.M2MTransferRequest:
|
||||
if x == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return nsdxml.Marshal(x)
|
||||
case *m2m.M2MTransferResponse:
|
||||
if x == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return nsdxml.Marshal(x)
|
||||
case *m2m.M2MTransferDecision:
|
||||
if x == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return nsdxml.Marshal(x)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// _ assertions
|
||||
var (
|
||||
_ Repository = (*PostgresRepository)(nil)
|
||||
)
|
||||
Reference in New Issue
Block a user