package pipeline import ( "context" "log/slog" "regexp" "sort" "strings" "sync/atomic" "time" "mqqt-scrubber/internal/config" "mqqt-scrubber/internal/model" "mqqt-scrubber/internal/parser" ) var invalidDeviceAliasCharacters = regexp.MustCompile(`[^a-z0-9_]+`) type writer interface { Write(ctx context.Context, records []model.Record) error } type Service struct { config config.Config deviceAliases map[string]string aliasRecords []model.Record influxClient writer input chan model.RawMessage received atomic.Uint64 parsed atomic.Uint64 written atomic.Uint64 dropped atomic.Uint64 failed atomic.Uint64 } type Snapshot struct { Received uint64 `json:"received"` Parsed uint64 `json:"parsed"` Written uint64 `json:"written"` Dropped uint64 `json:"dropped"` Failed uint64 `json:"failed"` } func NewService(cfg config.Config, influxClient writer) *Service { normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases) return &Service{ config: cfg, deviceAliases: normalizedAliases, aliasRecords: buildAliasRecords(normalizedAliases), influxClient: influxClient, input: make(chan model.RawMessage, cfg.App.BufferSize), } } func (service *Service) Enqueue(message model.RawMessage) { service.received.Add(1) select { case service.input <- message: default: service.dropped.Add(1) slog.Warn("dropping message because buffer is full", "topic", message.Topic) } } func (service *Service) Run(ctx context.Context) error { ticker := time.NewTicker(service.config.App.FlushInterval.Duration) defer ticker.Stop() batch := append(make([]model.Record, 0, service.config.App.BatchSize+len(service.aliasRecords)), service.aliasRecords...) if len(batch) > 0 { flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) err := service.flush(flushCtx, batch) cancel() if err != nil { slog.Error("failed to flush alias metadata to influx; will retry on next interval", "count", len(batch), "error", err) } else { batch = batch[:0] } } var input <-chan model.RawMessage = service.input for { if len(batch) >= service.config.App.BatchSize { input = nil } else { input = service.input } select { case <-ctx.Done(): flushCtx, cancel := context.WithTimeout(context.Background(), service.config.App.FlushTimeout.Duration) err := service.flush(flushCtx, batch) cancel() if err != nil { return err } service.logCounters() return nil case message := <-input: records, err := parser.ParseTasmota(message) if err != nil { service.failed.Add(1) slog.Warn("failed to parse message", "topic", message.Topic, "error", err) continue } service.applyDeviceAliases(records) service.parsed.Add(uint64(len(records))) batch = append(batch, records...) if len(batch) >= service.config.App.BatchSize { flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) err := service.flush(flushCtx, batch) cancel() if err != nil { slog.Error("failed to flush full batch to influx; keeping batch in memory", "count", len(batch), "error", err) continue } batch = batch[:0] } case <-ticker.C: if len(batch) == 0 { continue } flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) err := service.flush(flushCtx, batch) cancel() if err != nil { slog.Error("failed to flush batch to influx; will retry on next interval", "count", len(batch), "error", err) continue } batch = batch[:0] } } } func (service *Service) flush(ctx context.Context, batch []model.Record) error { if len(batch) == 0 { return nil } if err := service.influxClient.Write(ctx, batch); err != nil { service.failed.Add(uint64(len(batch))) return err } service.written.Add(uint64(len(batch))) slog.Info("flushed records to influx", "count", len(batch)) return nil } func (service *Service) logCounters() { slog.Info("service counters", "received", service.received.Load(), "parsed", service.parsed.Load(), "written", service.written.Load(), "dropped", service.dropped.Load(), "failed", service.failed.Load(), ) } func (service *Service) Snapshot() Snapshot { return Snapshot{ Received: service.received.Load(), Parsed: service.parsed.Load(), Written: service.written.Load(), Dropped: service.dropped.Load(), Failed: service.failed.Load(), } } func (service *Service) applyDeviceAliases(records []model.Record) { if len(service.deviceAliases) == 0 { return } for index := range records { device := normalizeDeviceKey(records[index].Tags["device"]) alias, ok := service.deviceAliases[device] if !ok || alias == "" { continue } records[index].Fields["device_alias"] = alias } } func normalizeDeviceAliases(aliases map[string]string) map[string]string { if len(aliases) == 0 { return nil } normalized := make(map[string]string, len(aliases)) for device, alias := range aliases { cleanDevice := normalizeDeviceKey(device) cleanAlias := strings.TrimSpace(alias) if cleanDevice == "" || cleanAlias == "" { continue } normalized[cleanDevice] = cleanAlias } if len(normalized) == 0 { return nil } return normalized } func normalizeDeviceKey(value string) string { normalized := strings.ToLower(strings.TrimSpace(value)) normalized = strings.ReplaceAll(normalized, "-", "_") normalized = strings.ReplaceAll(normalized, " ", "_") normalized = invalidDeviceAliasCharacters.ReplaceAllString(normalized, "_") normalized = strings.Trim(normalized, "_") return normalized } func buildAliasRecords(aliases map[string]string) []model.Record { if len(aliases) == 0 { return nil } devices := make([]string, 0, len(aliases)) for device := range aliases { devices = append(devices, device) } sort.Strings(devices) timestamp := time.Now().UTC() records := make([]model.Record, 0, len(devices)) for _, device := range devices { records = append(records, model.Record{ Measurement: "tasmota_device_meta", Tags: map[string]string{ "device": device, "source": "config", }, Fields: map[string]any{ "device_alias": aliases[device], }, Timestamp: timestamp, }) } return records }