Files
Bridge-and-Join-s/internal/m2mcore/pgrepo.go
T
zuevav 9737c787f9 feat: живой цикл M2M с НРД + мастер установки ключа на флешку
Инфраструктура M2M (живой обмен с НРД через ИШ):
- обработка M2MTransferResponse: ERROR(M2Mxx) → заявка Отклонена, сохранение
  ответа; INFO → ждём Decision; идемпотентность поллера
- fallback-корреляция ответов с нулевым GUID (M2M14/M2M17) по FIFO
- сырой XML ответа НРД в карточке заявки (для пересылки в ТП)
- тестовый пакет роботу приведён к эталону m2m_robot_samples (CostInfo=Yes,
  4 бумаги, IsolationStatus, DocumentSeries=сценарий); override паспорта
- редирект из теста сразу в карточку заявки

Мастер установки ключа Валидаты на флешку (admin/setup/keywizard):
- пошаговый: загрузка .7z+пароль → выбор флешки → запись → справочник
  сертификатов (CRL) → перезапуск+проверка ИШ → готово
- привилегированный воркер (bj-keymedia) в host-namespace через файл-обмен,
  bj-server остаётся в песочнице
- сохранение структуры профиля архива (spr<N>), перечисление съёмных USB

Прочее:
- пакет-доказательство для ТП НРД + форма регистрации участника M2M
- эталонные образцы робота (DOC/m2m_robot_samples)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 00:03:21 +03:00

302 lines
9.0 KiB
Go

package m2mcore
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"git.zetit.ru/zuevav/Bridge-and-Join-s/internal/m2m"
"git.zetit.ru/zuevav/Bridge-and-Join-s/internal/nsdxml"
)
// PostgresRepository — реализация Repository через jackc/pgx.
// Использует таблицы m2m_core.deals и m2m_core.deal_events
// (см. migrations/m2m-core/001__deals.sql и 002__stages.sql).
type PostgresRepository struct {
pool *pgxpool.Pool
}
// NewPostgresRepository подключается по DSN (postgres://user:pass@host:port/db).
func NewPostgresRepository(ctx context.Context, dsn string) (*PostgresRepository, error) {
if dsn == "" {
return nil, errors.New("m2mcore: PostgresRepository: пустой DSN")
}
cfg, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, fmt.Errorf("pgxpool.ParseConfig: %w", err)
}
cfg.MaxConns = 10
cfg.MaxConnLifetime = 30 * time.Minute
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("pgxpool.NewWithConfig: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("pool.Ping: %w", err)
}
return &PostgresRepository{pool: pool}, nil
}
// Close закрывает пул.
func (r *PostgresRepository) Close() {
if r.pool != nil {
r.pool.Close()
}
}
// Create вставляет сделку идемпотентно по guid. Если запись с таким
// guid уже есть, возвращает её (без модификации).
func (r *PostgresRepository) Create(ctx context.Context, deal *Deal) (*Deal, error) {
reqXML, _ := marshalXMLIfPresent(deal.Request)
respXML := responseBytes(deal)
decisionXML, _ := marshalXMLIfPresent(deal.Decision)
stages, err := json.Marshal(deal.Stages)
if err != nil {
return nil, err
}
const q = `
INSERT INTO m2m_core.deals (id, guid, state, investor_id, signed_claim,
request_xml, response_xml, decision_xml,
stages, created_at, updated_at)
VALUES ($1, $2, $3, NULLIF($4, '')::uuid, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (guid) DO NOTHING
RETURNING id`
var id string
err = r.pool.QueryRow(ctx, q,
deal.ID, string(deal.GUID), string(deal.State), deal.InvestorID,
deal.SignedClaim, reqXML, respXML, decisionXML,
stages, deal.CreatedAt, deal.UpdatedAt,
).Scan(&id)
if errors.Is(err, pgx.ErrNoRows) {
// уже существовала — вернём существующую
return r.GetByGUID(ctx, deal.GUID)
}
if err != nil {
return nil, fmt.Errorf("PostgresRepository.Create: %w", err)
}
return deal, nil
}
// GetByGUID находит сделку по M2M GUID.
func (r *PostgresRepository) GetByGUID(ctx context.Context, guid m2m.UUID) (*Deal, error) {
return r.scanOne(ctx, `WHERE guid = $1`, string(guid))
}
// GetByID находит сделку по внутреннему UUID.
func (r *PostgresRepository) GetByID(ctx context.Context, id string) (*Deal, error) {
return r.scanOne(ctx, `WHERE id = $1`, id)
}
// Update сохраняет полное состояние сделки (для простоты — без diff).
func (r *PostgresRepository) Update(ctx context.Context, deal *Deal) error {
reqXML, _ := marshalXMLIfPresent(deal.Request)
respXML := responseBytes(deal)
decisionXML, _ := marshalXMLIfPresent(deal.Decision)
stages, err := json.Marshal(deal.Stages)
if err != nil {
return err
}
const q = `
UPDATE m2m_core.deals
SET state = $2, request_xml = $3, response_xml = $4, decision_xml = $5,
stages = $6, updated_at = $7
WHERE id = $1`
tag, err := r.pool.Exec(ctx, q,
deal.ID, string(deal.State), reqXML, respXML, decisionXML, stages, deal.UpdatedAt,
)
if err != nil {
return fmt.Errorf("PostgresRepository.Update: %w", err)
}
if tag.RowsAffected() == 0 {
return ErrNotFound
}
return nil
}
// List возвращает сделки по фильтру.
func (r *PostgresRepository) List(ctx context.Context, f Filter) ([]*Deal, error) {
args := []any{}
where := ""
add := func(cond string, val any) {
args = append(args, val)
if where == "" {
where = " WHERE "
} else {
where += " AND "
}
where += fmt.Sprintf(cond, len(args))
}
if f.State != nil {
add(`state = $%d`, string(*f.State))
}
if f.InvestorID != "" {
add(`investor_id = $%d::uuid`, f.InvestorID)
}
if f.CreatedFrom != nil {
add(`created_at >= $%d`, *f.CreatedFrom)
}
if f.CreatedTo != nil {
add(`created_at <= $%d`, *f.CreatedTo)
}
limit := f.Limit
if limit <= 0 {
limit = 50
}
offset := f.Offset
q := dealsSelectSQL() + where + fmt.Sprintf(" ORDER BY created_at DESC LIMIT %d OFFSET %d", limit, offset)
rows, err := r.pool.Query(ctx, q, args...)
if err != nil {
return nil, fmt.Errorf("PostgresRepository.List: %w", err)
}
defer rows.Close()
out := make([]*Deal, 0)
for rows.Next() {
d, err := scanRow(rows)
if err != nil {
return nil, err
}
out = append(out, d)
}
return out, rows.Err()
}
// AppendEvent добавляет аудит-событие сделки.
func (r *PostgresRepository) AppendEvent(ctx context.Context, dealID string, ev Event) error {
payload, _ := json.Marshal(ev.Payload)
const q = `
INSERT INTO m2m_core.deal_events (deal_id, type, payload, actor, created_at)
VALUES ($1::uuid, $2, $3, $4, $5)`
_, err := r.pool.Exec(ctx, q, dealID, ev.Type, payload, ev.Actor, ev.CreatedAt)
if err != nil {
return fmt.Errorf("PostgresRepository.AppendEvent: %w", err)
}
return nil
}
// scanOne возвращает одну запись по where-условию.
func (r *PostgresRepository) scanOne(ctx context.Context, where string, args ...any) (*Deal, error) {
q := dealsSelectSQL() + " " + where
row := r.pool.QueryRow(ctx, q, args...)
d, err := scanRow(row)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return d, err
}
// rowScanner объединяет pgx.Row и pgx.Rows для общего scanRow.
type rowScanner interface {
Scan(dest ...any) error
}
// scanRow читает одну строку deals в *Deal.
func scanRow(r rowScanner) (*Deal, error) {
var (
id, guid, state string
investorID *string
signedClaim []byte
reqXML, respXML, decisionX []byte
stages []byte
createdAt, updatedAt time.Time
)
if err := r.Scan(&id, &guid, &state, &investorID, &signedClaim,
&reqXML, &respXML, &decisionX, &stages, &createdAt, &updatedAt); err != nil {
return nil, err
}
d := &Deal{
ID: id,
GUID: m2m.UUID(guid),
State: State(state),
SignedClaim: signedClaim,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
}
if investorID != nil {
d.InvestorID = *investorID
}
if len(reqXML) > 0 {
var v m2m.M2MTransferRequest
if err := nsdxml.Unmarshal(reqXML, &v); err == nil {
d.Request = &v
}
}
if len(respXML) > 0 {
// Сохраняем точные байты для дословного показа/пересылки в ТП НРД.
d.RawResponse = respXML
var v m2m.M2MTransferResponse
if err := nsdxml.Unmarshal(respXML, &v); err == nil {
d.Response = &v
}
}
if len(decisionX) > 0 {
var v m2m.M2MTransferDecision
if err := nsdxml.Unmarshal(decisionX, &v); err == nil {
d.Decision = &v
}
}
if len(stages) > 0 {
_ = json.Unmarshal(stages, &d.Stages)
}
return d, nil
}
// dealsSelectSQL — SELECT с фиксированным порядком колонок.
func dealsSelectSQL() string {
return `SELECT id, guid, state, investor_id, signed_claim,
request_xml, response_xml, decision_xml,
stages, created_at, updated_at
FROM m2m_core.deals`
}
// responseBytes возвращает байты ответа МОСТ для записи в response_xml:
// точные байты от НРД (RawResponse), если они есть, иначе пере-сериализация
// разобранной структуры. Точные байты предпочтительны — их можно дословно
// переслать в техподдержку НРД.
func responseBytes(deal *Deal) []byte {
if len(deal.RawResponse) > 0 {
return deal.RawResponse
}
b, _ := marshalXMLIfPresent(deal.Response)
return b
}
// marshalXMLIfPresent сериализует *T в windows-1251 XML (или возвращает nil).
func marshalXMLIfPresent(v any) ([]byte, error) {
if v == nil {
return nil, nil
}
// Через reflect не используем — у nsdxml.Marshal вход — interface{}.
// Здесь же поступает *m2m.M2M... — проверим nil-ness через type-switch.
switch x := v.(type) {
case *m2m.M2MTransferRequest:
if x == nil {
return nil, nil
}
return nsdxml.Marshal(x)
case *m2m.M2MTransferResponse:
if x == nil {
return nil, nil
}
return nsdxml.Marshal(x)
case *m2m.M2MTransferDecision:
if x == nil {
return nil, nil
}
return nsdxml.Marshal(x)
}
return nil, nil
}
// _ assertions
var (
_ Repository = (*PostgresRepository)(nil)
)