fix(poller): постоянная память обработанных пакетов ИШ
При перезапуске bj-server поллер заново вычитывал старые ответы НРД из ИШ и повторно применял их к заявкам. Для ответов с нулевым GUID (M2Mxx) это давало ложные «отклонения» по FIFO — все заявки выглядели rejected. Теперь множество применённых id входящих пакетов хранится в /var/lib/bj/.bj/poller-processed.json и переживает перезапуск. На самом первом старте все уже лежащие во входящих пакеты помечаются обработанными, чтобы не применять исторический backlog к текущим заявкам. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -2,13 +2,55 @@ package lkgateway
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.zetit.ru/zuevav/Bridge-and-Join-s/internal/nsdadapter/igw"
|
"git.zetit.ru/zuevav/Bridge-and-Join-s/internal/nsdadapter/igw"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// pollerStateFile — постоянная память id применённых входящих пакетов ИШ.
|
||||||
|
// Без неё перезапуск bj-server заново вычитывал бы старые ответы НРД и
|
||||||
|
// повторно применял их к заявкам (для ответов с нулевым GUID это приводило
|
||||||
|
// к ложным «отклонениям» по FIFO). Файл переживает перезапуски.
|
||||||
|
const pollerStateFile = "/var/lib/bj/.bj/poller-processed.json"
|
||||||
|
|
||||||
|
// loadProcessed читает множество обработанных id; второй результат false,
|
||||||
|
// если файла нет (самый первый запуск).
|
||||||
|
func loadProcessed() (map[int]bool, bool) {
|
||||||
|
m := make(map[int]bool)
|
||||||
|
b, err := os.ReadFile(pollerStateFile)
|
||||||
|
if err != nil {
|
||||||
|
return m, false
|
||||||
|
}
|
||||||
|
var ids []int
|
||||||
|
if json.Unmarshal(b, &ids) != nil {
|
||||||
|
return m, false
|
||||||
|
}
|
||||||
|
for _, id := range ids {
|
||||||
|
m[id] = true
|
||||||
|
}
|
||||||
|
return m, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// saveProcessed атомарно сохраняет множество обработанных id.
|
||||||
|
func saveProcessed(m map[int]bool) {
|
||||||
|
ids := make([]int, 0, len(m))
|
||||||
|
for id := range m {
|
||||||
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
|
b, err := json.Marshal(ids)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tmp := pollerStateFile + ".tmp"
|
||||||
|
if os.WriteFile(tmp, b, 0o640) == nil {
|
||||||
|
_ = os.Rename(tmp, pollerStateFile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// pollIncoming периодически опрашивает ИШ на входящие пакеты от НРД
|
// pollIncoming периодически опрашивает ИШ на входящие пакеты от НРД
|
||||||
// (M2MTransferDecision / Response) и применяет их через svc.ApplyDecision.
|
// (M2MTransferDecision / Response) и применяет их через svc.ApplyDecision.
|
||||||
// Замыкает цикл: bj-server отправил заявку → ИШ → НРД → робот ответил →
|
// Замыкает цикл: bj-server отправил заявку → ИШ → НРД → робот ответил →
|
||||||
@@ -22,8 +64,15 @@ func (s *Server) pollIncoming(ctx context.Context) {
|
|||||||
ticker := time.NewTicker(interval)
|
ticker := time.NewTicker(interval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
processed := make(map[int]bool)
|
processed, existed := loadProcessed()
|
||||||
log.Printf("lk-gateway: поллер входящих ИШ запущен (канал %s, интервал %s)", s.igwChannel, interval)
|
if !existed {
|
||||||
|
// Первый запуск: помечаем все уже лежащие во входящих пакеты как
|
||||||
|
// обработанные, чтобы не применять исторический backlog (старые ответы
|
||||||
|
// НРД) к текущим заявкам. Реальные новые ответы придут позже.
|
||||||
|
s.seedProcessed(ctx, processed)
|
||||||
|
saveProcessed(processed)
|
||||||
|
}
|
||||||
|
log.Printf("lk-gateway: поллер входящих ИШ запущен (канал %s, интервал %s, обработанных в памяти %d)", s.igwChannel, interval, len(processed))
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@@ -60,10 +109,27 @@ func (s *Server) fetchAndApply(ctx context.Context, processed map[int]bool) {
|
|||||||
continue // не помечаем обработанным — повторим в след. раз
|
continue // не помечаем обработанным — повторим в след. раз
|
||||||
}
|
}
|
||||||
processed[p.ID] = true
|
processed[p.ID] = true
|
||||||
|
saveProcessed(processed) // переживает перезапуск
|
||||||
log.Printf("lk-gateway: поллер применил входящий пакет id=%d тип=%s", p.ID, p.Type)
|
log.Printf("lk-gateway: поллер применил входящий пакет id=%d тип=%s", p.ID, p.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// seedProcessed помечает все текущие входящие пакеты как обработанные
|
||||||
|
// (используется при самом первом запуске поллера).
|
||||||
|
func (s *Server) seedProcessed(ctx context.Context, processed map[int]bool) {
|
||||||
|
cctx, cancel := context.WithTimeout(ctx, 25*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
pkgs, err := s.igwClient.ListIncoming(cctx, igw.ListFilter{Channel: s.igwChannel, Date: time.Now()})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("lk-gateway: поллер seed: ListIncoming: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, p := range pkgs {
|
||||||
|
processed[p.ID] = true
|
||||||
|
}
|
||||||
|
log.Printf("lk-gateway: поллер первый запуск — засеяно %d существующих пакетов как обработанные", len(pkgs))
|
||||||
|
}
|
||||||
|
|
||||||
// applyIncoming забирает тело пакета и применяет M2M-ответ к сделке.
|
// applyIncoming забирает тело пакета и применяет M2M-ответ к сделке.
|
||||||
// Среди входящих от НРД много служебных пакетов (квитанции ЭДО типа C/Z,
|
// Среди входящих от НРД много служебных пакетов (квитанции ЭДО типа C/Z,
|
||||||
// конверты) — они не M2M-документы и пропускаются. Реальные ответы —
|
// конверты) — они не M2M-документы и пропускаются. Реальные ответы —
|
||||||
|
|||||||
Reference in New Issue
Block a user