From 586ffb3a31b5d937f195a36c0c648caf262f7981 Mon Sep 17 00:00:00 2001 From: zuevav Date: Fri, 19 Jun 2026 10:37:04 +0300 Subject: [PATCH] =?UTF-8?q?fix(poller):=20=D0=BF=D0=BE=D1=81=D1=82=D0=BE?= =?UTF-8?q?=D1=8F=D0=BD=D0=BD=D0=B0=D1=8F=20=D0=BF=D0=B0=D0=BC=D1=8F=D1=82?= =?UTF-8?q?=D1=8C=20=D0=BE=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D0=B0=D0=BD?= =?UTF-8?q?=D0=BD=D1=8B=D1=85=20=D0=BF=D0=B0=D0=BA=D0=B5=D1=82=D0=BE=D0=B2?= =?UTF-8?q?=20=D0=98=D0=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit При перезапуске bj-server поллер заново вычитывал старые ответы НРД из ИШ и повторно применял их к заявкам. Для ответов с нулевым GUID (M2Mxx) это давало ложные «отклонения» по FIFO — все заявки выглядели rejected. Теперь множество применённых id входящих пакетов хранится в /var/lib/bj/.bj/poller-processed.json и переживает перезапуск. На самом первом старте все уже лежащие во входящих пакеты помечаются обработанными, чтобы не применять исторический backlog к текущим заявкам. Co-Authored-By: Claude Opus 4.8 --- internal/lkgateway/nsdpoller.go | 70 ++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/internal/lkgateway/nsdpoller.go b/internal/lkgateway/nsdpoller.go index 28f5300..8e38431 100644 --- a/internal/lkgateway/nsdpoller.go +++ b/internal/lkgateway/nsdpoller.go @@ -2,13 +2,55 @@ package lkgateway import ( "context" + "encoding/json" "log" + "os" "strings" "time" "git.zetit.ru/zuevav/Bridge-and-Join-s/internal/nsdadapter/igw" ) +// pollerStateFile — постоянная память id применённых входящих пакетов ИШ. +// Без неё перезапуск bj-server заново вычитывал бы старые ответы НРД и +// повторно применял их к заявкам (для ответов с нулевым GUID это приводило +// к ложным «отклонениям» по FIFO). Файл переживает перезапуски. +const pollerStateFile = "/var/lib/bj/.bj/poller-processed.json" + +// loadProcessed читает множество обработанных id; второй результат false, +// если файла нет (самый первый запуск). +func loadProcessed() (map[int]bool, bool) { + m := make(map[int]bool) + b, err := os.ReadFile(pollerStateFile) + if err != nil { + return m, false + } + var ids []int + if json.Unmarshal(b, &ids) != nil { + return m, false + } + for _, id := range ids { + m[id] = true + } + return m, true +} + +// saveProcessed атомарно сохраняет множество обработанных id. +func saveProcessed(m map[int]bool) { + ids := make([]int, 0, len(m)) + for id := range m { + ids = append(ids, id) + } + b, err := json.Marshal(ids) + if err != nil { + return + } + tmp := pollerStateFile + ".tmp" + if os.WriteFile(tmp, b, 0o640) == nil { + _ = os.Rename(tmp, pollerStateFile) + } +} + // pollIncoming периодически опрашивает ИШ на входящие пакеты от НРД // (M2MTransferDecision / Response) и применяет их через svc.ApplyDecision. // Замыкает цикл: bj-server отправил заявку → ИШ → НРД → робот ответил → @@ -22,8 +64,15 @@ func (s *Server) pollIncoming(ctx context.Context) { ticker := time.NewTicker(interval) defer ticker.Stop() - processed := make(map[int]bool) - log.Printf("lk-gateway: поллер входящих ИШ запущен (канал %s, интервал %s)", s.igwChannel, interval) + processed, existed := loadProcessed() + if !existed { + // Первый запуск: помечаем все уже лежащие во входящих пакеты как + // обработанные, чтобы не применять исторический backlog (старые ответы + // НРД) к текущим заявкам. Реальные новые ответы придут позже. + s.seedProcessed(ctx, processed) + saveProcessed(processed) + } + log.Printf("lk-gateway: поллер входящих ИШ запущен (канал %s, интервал %s, обработанных в памяти %d)", s.igwChannel, interval, len(processed)) for { select { @@ -60,10 +109,27 @@ func (s *Server) fetchAndApply(ctx context.Context, processed map[int]bool) { continue // не помечаем обработанным — повторим в след. раз } processed[p.ID] = true + saveProcessed(processed) // переживает перезапуск log.Printf("lk-gateway: поллер применил входящий пакет id=%d тип=%s", p.ID, p.Type) } } +// seedProcessed помечает все текущие входящие пакеты как обработанные +// (используется при самом первом запуске поллера). +func (s *Server) seedProcessed(ctx context.Context, processed map[int]bool) { + cctx, cancel := context.WithTimeout(ctx, 25*time.Second) + defer cancel() + pkgs, err := s.igwClient.ListIncoming(cctx, igw.ListFilter{Channel: s.igwChannel, Date: time.Now()}) + if err != nil { + log.Printf("lk-gateway: поллер seed: ListIncoming: %v", err) + return + } + for _, p := range pkgs { + processed[p.ID] = true + } + log.Printf("lk-gateway: поллер первый запуск — засеяно %d существующих пакетов как обработанные", len(pkgs)) +} + // applyIncoming забирает тело пакета и применяет M2M-ответ к сделке. // Среди входящих от НРД много служебных пакетов (квитанции ЭДО типа C/Z, // конверты) — они не M2M-документы и пропускаются. Реальные ответы —