Files
Bridge-and-Join-s/internal/lkgateway/nsdpoller.go
zuevav 586ffb3a31 fix(poller): постоянная память обработанных пакетов ИШ
При перезапуске bj-server поллер заново вычитывал старые ответы НРД из ИШ и
повторно применял их к заявкам. Для ответов с нулевым GUID (M2Mxx) это давало
ложные «отклонения» по FIFO — все заявки выглядели rejected.

Теперь множество применённых id входящих пакетов хранится в
/var/lib/bj/.bj/poller-processed.json и переживает перезапуск. На самом первом
старте все уже лежащие во входящих пакеты помечаются обработанными, чтобы не
применять исторический backlog к текущим заявкам.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 10:37:04 +03:00

186 lines
8.2 KiB
Go

package lkgateway
import (
"context"
"encoding/json"
"log"
"os"
"strings"
"time"
"git.zetit.ru/zuevav/Bridge-and-Join-s/internal/nsdadapter/igw"
)
// pollerStateFile — постоянная память id применённых входящих пакетов ИШ.
// Без неё перезапуск bj-server заново вычитывал бы старые ответы НРД и
// повторно применял их к заявкам (для ответов с нулевым GUID это приводило
// к ложным «отклонениям» по FIFO). Файл переживает перезапуски.
const pollerStateFile = "/var/lib/bj/.bj/poller-processed.json"
// loadProcessed читает множество обработанных id; второй результат false,
// если файла нет (самый первый запуск).
func loadProcessed() (map[int]bool, bool) {
m := make(map[int]bool)
b, err := os.ReadFile(pollerStateFile)
if err != nil {
return m, false
}
var ids []int
if json.Unmarshal(b, &ids) != nil {
return m, false
}
for _, id := range ids {
m[id] = true
}
return m, true
}
// saveProcessed атомарно сохраняет множество обработанных id.
func saveProcessed(m map[int]bool) {
ids := make([]int, 0, len(m))
for id := range m {
ids = append(ids, id)
}
b, err := json.Marshal(ids)
if err != nil {
return
}
tmp := pollerStateFile + ".tmp"
if os.WriteFile(tmp, b, 0o640) == nil {
_ = os.Rename(tmp, pollerStateFile)
}
}
// pollIncoming периодически опрашивает ИШ на входящие пакеты от НРД
// (M2MTransferDecision / Response) и применяет их через svc.ApplyDecision.
// Замыкает цикл: bj-server отправил заявку → ИШ → НРД → робот ответил →
// ИШ забрал ответ во входящие → этот поллер применяет Decision (статус
// заявки переходит в confirmed/rejected, срабатывает callback в ЛК).
//
// Дедупликация по id обработанных пакетов: ИШ возвращает их повторно,
// пока мы не подтвердим, поэтому держим множество уже обработанных.
func (s *Server) pollIncoming(ctx context.Context) {
const interval = 30 * time.Second
ticker := time.NewTicker(interval)
defer ticker.Stop()
processed, existed := loadProcessed()
if !existed {
// Первый запуск: помечаем все уже лежащие во входящих пакеты как
// обработанные, чтобы не применять исторический backlog (старые ответы
// НРД) к текущим заявкам. Реальные новые ответы придут позже.
s.seedProcessed(ctx, processed)
saveProcessed(processed)
}
log.Printf("lk-gateway: поллер входящих ИШ запущен (канал %s, интервал %s, обработанных в памяти %d)", s.igwChannel, interval, len(processed))
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.fetchAndApply(ctx, processed)
}
}
}
// fetchAndApply — один проход поллера: список входящих → для каждого нового
// забираем тело, распаковываем, парсим Decision, применяем.
func (s *Server) fetchAndApply(ctx context.Context, processed map[int]bool) {
cctx, cancel := context.WithTimeout(ctx, 25*time.Second)
defer cancel()
// Тип не указываем — ИШ вернёт оба (M2MTD + M2MER). Date=сегодня.
pkgs, err := s.igwClient.ListIncoming(cctx, igw.ListFilter{
Channel: s.igwChannel,
Date: time.Now(),
})
if err != nil {
log.Printf("lk-gateway: поллер ListIncoming: %v", err)
return
}
for _, p := range pkgs {
if processed[p.ID] {
continue
}
if err := s.applyIncoming(cctx, p); err != nil {
log.Printf("lk-gateway: поллер пакет id=%d (%s): %v", p.ID, p.Type, err)
continue // не помечаем обработанным — повторим в след. раз
}
processed[p.ID] = true
saveProcessed(processed) // переживает перезапуск
log.Printf("lk-gateway: поллер применил входящий пакет id=%d тип=%s", p.ID, p.Type)
}
}
// seedProcessed помечает все текущие входящие пакеты как обработанные
// (используется при самом первом запуске поллера).
func (s *Server) seedProcessed(ctx context.Context, processed map[int]bool) {
cctx, cancel := context.WithTimeout(ctx, 25*time.Second)
defer cancel()
pkgs, err := s.igwClient.ListIncoming(cctx, igw.ListFilter{Channel: s.igwChannel, Date: time.Now()})
if err != nil {
log.Printf("lk-gateway: поллер seed: ListIncoming: %v", err)
return
}
for _, p := range pkgs {
processed[p.ID] = true
}
log.Printf("lk-gateway: поллер первый запуск — засеяно %d существующих пакетов как обработанные", len(pkgs))
}
// applyIncoming забирает тело пакета и применяет M2M-ответ к сделке.
// Среди входящих от НРД много служебных пакетов (квитанции ЭДО типа C/Z,
// конверты) — они не M2M-документы и пропускаются. Реальные ответы —
// M2MTransferDecision (решение принимающей стороны) и M2MTransferResponse
// (ответ сервиса МОСТ, в т.ч. ошибки M2Mxx).
func (s *Server) applyIncoming(ctx context.Context, p igw.Package) error {
zipBytes, err := s.igwClient.GetPackage(ctx, p.ID)
if err != nil {
return err // сетевая ошибка — повторим в след. раз
}
unpacked, err := igw.UnpackPackage(zipBytes)
if err != nil {
// Нет основного .xml — служебный пакет (квитанция/конверт ЭДО).
// Не ошибка: помечаем обработанным, чтобы не повторять.
log.Printf("lk-gateway: поллер пакет id=%d (%s) — служебный (квитанция/конверт), пропуск", p.ID, p.Type)
return nil
}
doc := string(unpacked.DocXML)
switch {
case strings.Contains(doc, "M2MTransferDecision"):
decision, err := igw.ParseDecision(unpacked.DocXML)
if err != nil {
log.Printf("lk-gateway: поллер Decision id=%d: разбор: %v", p.ID, err)
return nil
}
return s.svc.ApplyDecision(ctx, decision)
case strings.Contains(doc, "M2MTransferResponse"):
resp, err := igw.ParseResponse(unpacked.DocXML)
if err != nil {
log.Printf("lk-gateway: поллер Response id=%d: разбор: %v", p.ID, err)
return nil
}
// Ответ сервиса МОСТ: статус + код (M2Mxx). Применяем к сделке:
// INFO — приём в обработку (статус не меняется), ERROR — отказ сервиса
// (напр. M2M14 — отправитель не в справочнике), сделка → Отклонена.
// Ответ сохраняется в сделке и виден в карточке заявки.
var codes string
for _, rr := range resp.Responses {
codes += string(rr.Code) + " "
}
log.Printf("lk-gateway: поллер M2MTransferResponse id=%d: статус=%s коды=[%s] GUID=%s",
p.ID, resp.StatusCode, strings.TrimSpace(codes), resp.GUID)
if err := s.svc.ApplyServiceResponse(ctx, resp, unpacked.DocXML); err != nil {
// Сделка может быть не найдена (ответ на чужой/старый GUID) —
// логируем, но помечаем обработанным, чтобы не зациклиться.
log.Printf("lk-gateway: поллер ApplyServiceResponse id=%d GUID=%s: %v", p.ID, resp.GUID, err)
}
return nil
default:
log.Printf("lk-gateway: поллер пакет id=%d (%s) — неизвестный M2M-документ, пропуск", p.ID, p.Type)
return nil
}
}