Files
Bridge-and-Join-s/internal/lkgateway/server.go
T
fontvielle ee642e5eaa feat(m2mcore): PgRepository через pgx + интеграция в lk-gateway
PostgreSQL-репозиторий для m2m_core.deals — реальное хранилище сделок
вместо in-memory. Выбор Repository происходит автоматически в
lkgateway.NewServer: если в runtime-конфиге задан Postgres DSN, поднимается
pgxpool и используется PostgresRepository; иначе fallback на MemoryRepository.

internal/m2mcore/pgrepo.go:
- PostgresRepository: Create (идемпотентный по guid через ON CONFLICT DO NOTHING),
  GetByGUID, GetByID, Update, List (с фильтрами state/investor/created_*),
  AppendEvent для журнала deal_events
- request_xml/response_xml/decision_xml хранятся как windows-1251 XML через nsdxml,
  на чтении парсятся обратно в m2m.M2M* структуры
- stages — jsonb с историей FSM-переходов

migrations/m2m-core/002__stages.sql:
- ALTER TABLE deals ADD COLUMN stages jsonb DEFAULT '[]'

internal/lkgateway/server.go:
- При NewServer проверяется runtime-config: если есть DSN → PostgresRepository,
  иначе MemoryRepository; ошибка подключения логируется с fallback на in-memory
- Тесты используют tempdir SetupPath для изоляции от реальной БД

internal/lkgateway/setup.go:
- tryPingPostgres переписан с database/sql (требует регистрации драйвера)
  на pgx.Connect — теперь форма /admin/setup/postgres реально проверяет
  подключение перед сохранением DSN

Проверено сквозным smoke-тестом: введение DSN через UI →
сохранение в ~/.bj/setup.json → перезапуск lk-gateway → лог
"PostgresRepository подключён (m2m_core.deals)" → сделки реально пишутся
в БД.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 13:43:49 +03:00

248 lines
7.5 KiB
Go

package lkgateway
import (
"context"
"errors"
"log"
"net/http"
"time"
"git.zetit.ru/zuevav/Bridge-and-Join-s/internal/m2m"
"git.zetit.ru/zuevav/Bridge-and-Join-s/internal/m2mcore"
"git.zetit.ru/zuevav/Bridge-and-Join-s/internal/nsdadapter/mock"
)
// ServerConfig — конфигурация HTTP-сервера lk-gateway.
type ServerConfig struct {
Addr string
DefaultSender m2m.DeponentCode
DefaultReceiver m2m.DeponentCode
CheckOptions func() CheckOptions
MockDecisionDelay time.Duration // 0 = дефолт 3 секунды
SetupPath string // путь к JSON-файлу runtime-конфига (пусто = ~/.bj/setup.json)
}
// Server — обвязка HTTP + сервис + workers.
type Server struct {
cfg ServerConfig
svc *Service
mock *mock.Sender
store *SeedStore
rc *RuntimeConfig
mux *http.ServeMux
server *http.Server
}
// NewServer собирает Server с репозиторием, mock NSDSender, SeedStore
// и REST + Admin маршрутами. Выбор Repository:
// - если в runtime-конфиге (или ENV-fallback в cfg) задан PostgresDSN
// и pgx-Pool успешно создаётся — используется PostgresRepository;
// - иначе fallback на MemoryRepository (M2-демо).
func NewServer(cfg ServerConfig) (*Server, error) {
store := NewSeedStore()
mockCfg := mock.DefaultConfig()
mockCfg.NSDSenderCode = "MC0010300000"
if cfg.MockDecisionDelay > 0 {
mockCfg.DecisionDelay = cfg.MockDecisionDelay
}
sender := mock.NewSender(mockCfg)
rc, err := NewRuntimeConfig(cfg.SetupPath)
if err != nil {
return nil, err
}
// Repository: pgx если DSN указан, иначе in-memory.
var repo m2mcore.Repository = m2mcore.NewMemoryRepository()
if dsn := rc.Snapshot().Postgres.DSN; dsn != "" {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
pgRepo, pgErr := m2mcore.NewPostgresRepository(ctx, dsn)
cancel()
if pgErr != nil {
log.Printf("lk-gateway: PostgresRepository отказал, fallback MemoryRepository: %v", pgErr)
} else {
repo = pgRepo
log.Printf("lk-gateway: PostgresRepository подключён (m2m_core.deals)")
}
}
svc := NewService(Config{
Repository: repo,
Sender: sender,
Store: store,
Recorder: m2mcore.NewMemoryRecorder(),
DefaultSender: cfg.DefaultSender,
DefaultReceiver: cfg.DefaultReceiver,
})
// Если runtime-конфиг уже содержит callback URL — применяем его.
if s := rc.Snapshot(); s.LK.CallbackURL != "" {
svc.callbackURL = s.LK.CallbackURL
}
mux := http.NewServeMux()
RegisterAPI(mux, svc)
// CheckOptions берётся из runtime-конфига при каждом запросе на дашборд.
checkOpts := func() CheckOptions {
s := rc.Snapshot()
profile := "demo (mock NSD)"
if s.NSD.Profile != "" {
profile = s.NSD.Profile
}
return CheckOptions{
PostgresDSN: s.Postgres.DSN,
CryptoSocket: s.Crypto.SocketPath,
NSDAdapterURL: s.NSD.IGWBaseURL,
LKCallbackURL: s.LK.CallbackURL,
Profile: profile,
CryptoProvider: nonEmpty(s.Crypto.Provider, "stub"),
Timeout: 2 * time.Second,
}
}
if cfg.CheckOptions != nil {
// Опциональный override (например, из cmd/lk-gateway для override ENV-перетягивания).
checkOpts = cfg.CheckOptions
}
adminTpl, err := RegisterAdmin(mux, svc, checkOpts)
if err != nil {
return nil, err
}
registerSetup(mux, adminTpl, rc, svc)
registerHealth(mux)
registerSetCallback(mux, svc, rc)
registerSeedListing(mux, store)
return &Server{
cfg: cfg,
svc: svc,
mock: sender,
store: store,
rc: rc,
mux: mux,
server: &http.Server{
Addr: cfg.Addr,
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
},
}, nil
}
// RuntimeConfig возвращает текущий runtime-конфиг (для тестов).
func (s *Server) RuntimeConfig() *RuntimeConfig { return s.rc }
func nonEmpty(s, def string) string {
if s == "" {
return def
}
return s
}
// SetCallbackURL обновляет адрес, куда отправлять PATCH callback'и в ЛК.
func (s *Server) SetCallbackURL(url string) { s.svc.callbackURL = url }
// Service возвращает Service для тестов.
func (s *Server) Service() *Service { return s.svc }
// Mock возвращает mock-сендер.
func (s *Server) Mock() *mock.Sender { return s.mock }
// Store возвращает SeedStore.
func (s *Server) Store() *SeedStore { return s.store }
// Mux возвращает обработчик (для httptest).
func (s *Server) Mux() http.Handler { return s.mux }
// Run поднимает HTTP-сервер и фоновый Decisions-consumer.
// Блокируется до ctx.Done().
func (s *Server) Run(ctx context.Context) error {
go s.consumeDecisions(ctx)
errCh := make(chan error, 1)
go func() {
log.Printf("lk-gateway: listen %s", s.cfg.Addr)
errCh <- s.server.ListenAndServe()
}()
select {
case <-ctx.Done():
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.server.Shutdown(shutdownCtx)
return nil
case err := <-errCh:
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return err
}
}
// consumeDecisions слушает Decisions от mock и обновляет соответствующие сделки.
func (s *Server) consumeDecisions(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case d := <-s.mock.Decisions():
if d == nil {
continue
}
if err := s.svc.ApplyDecision(ctx, d); err != nil {
log.Printf("lk-gateway: ApplyDecision GUID=%s: %v", d.Header.GUID, err)
} else {
log.Printf("lk-gateway: Decision применён GUID=%s, callback в %s", d.Header.GUID, s.svc.callbackURL)
}
}
}
}
func registerHealth(mux *http.ServeMux) {
mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok\n"))
})
}
// registerSetCallback — служебный POST /admin/api/callback-url для
// эмулятора ЛК, чтобы сообщить gateway свой URL. Если URL уже сохранён
// в runtime-конфиге (пользователь явно настроил его через UI), запрос
// эмулятора игнорируется — приоритет у явно настроенного.
func registerSetCallback(mux *http.ServeMux, svc *Service, rc *RuntimeConfig) {
mux.HandleFunc("/admin/api/callback-url", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method", http.StatusMethodNotAllowed)
return
}
url := r.URL.Query().Get("url")
if url == "" {
http.Error(w, "url required", http.StatusBadRequest)
return
}
if rc != nil {
if s := rc.Snapshot(); s.LK.CallbackURL != "" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("kept-user-configured"))
return
}
}
svc.callbackURL = url
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
}
func registerSeedListing(mux *http.ServeMux, store *SeedStore) {
mux.HandleFunc("/admin/api/clients", func(w http.ResponseWriter, _ *http.Request) {
type c struct {
ID, LastName, FirstName, MiddleName string
}
out := make([]c, 0)
for _, cl := range store.Clients() {
out = append(out, c{ID: cl.ID, LastName: cl.LastName, FirstName: cl.FirstName, MiddleName: cl.MiddleName})
}
writeJSON(w, http.StatusOK, out)
})
}