package pipeline import ( "context" "log/slog" "sync/atomic" "time" "mqqt-scrubber/internal/config" "mqqt-scrubber/internal/model" "mqqt-scrubber/internal/parser" ) type writer interface { Write(ctx context.Context, records []model.Record) error } type Service struct { config config.Config influxClient writer input chan model.RawMessage received atomic.Uint64 parsed atomic.Uint64 written atomic.Uint64 dropped atomic.Uint64 failed atomic.Uint64 } type Snapshot struct { Received uint64 `json:"received"` Parsed uint64 `json:"parsed"` Written uint64 `json:"written"` Dropped uint64 `json:"dropped"` Failed uint64 `json:"failed"` } func NewService(cfg config.Config, influxClient writer) *Service { return &Service{ config: cfg, influxClient: influxClient, input: make(chan model.RawMessage, cfg.App.BufferSize), } } func (service *Service) Enqueue(message model.RawMessage) { service.received.Add(1) select { case service.input <- message: default: service.dropped.Add(1) slog.Warn("dropping message because buffer is full", "topic", message.Topic) } } func (service *Service) Run(ctx context.Context) error { ticker := time.NewTicker(service.config.App.FlushInterval.Duration) defer ticker.Stop() batch := make([]model.Record, 0, service.config.App.BatchSize) var input <-chan model.RawMessage = service.input for { if len(batch) >= service.config.App.BatchSize { input = nil } else { input = service.input } select { case <-ctx.Done(): flushCtx, cancel := context.WithTimeout(context.Background(), service.config.App.FlushTimeout.Duration) err := service.flush(flushCtx, batch) cancel() if err != nil { return err } service.logCounters() return nil case message := <-input: records, err := parser.ParseTasmota(message) if err != nil { service.failed.Add(1) slog.Warn("failed to parse message", "topic", message.Topic, "error", err) continue } service.parsed.Add(uint64(len(records))) batch = append(batch, records...) if len(batch) >= service.config.App.BatchSize { flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) err := service.flush(flushCtx, batch) cancel() if err != nil { slog.Error("failed to flush full batch to influx; keeping batch in memory", "count", len(batch), "error", err) continue } batch = batch[:0] } case <-ticker.C: if len(batch) == 0 { continue } flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) err := service.flush(flushCtx, batch) cancel() if err != nil { slog.Error("failed to flush batch to influx; will retry on next interval", "count", len(batch), "error", err) continue } batch = batch[:0] } } } func (service *Service) flush(ctx context.Context, batch []model.Record) error { if len(batch) == 0 { return nil } if err := service.influxClient.Write(ctx, batch); err != nil { service.failed.Add(uint64(len(batch))) return err } service.written.Add(uint64(len(batch))) slog.Info("flushed records to influx", "count", len(batch)) return nil } func (service *Service) logCounters() { slog.Info("service counters", "received", service.received.Load(), "parsed", service.parsed.Load(), "written", service.written.Load(), "dropped", service.dropped.Load(), "failed", service.failed.Load(), ) } func (service *Service) Snapshot() Snapshot { return Snapshot{ Received: service.received.Load(), Parsed: service.parsed.Load(), Written: service.written.Load(), Dropped: service.dropped.Load(), Failed: service.failed.Load(), } }