package pipeline import ( "context" "log/slog" "regexp" "sort" "strings" "sync/atomic" "time" "mqqt-scrubber/internal/config" "mqqt-scrubber/internal/model" "mqqt-scrubber/internal/parser" ) var invalidDeviceAliasCharacters = regexp.MustCompile(`[^a-z0-9_]+`) type writer interface { Write(ctx context.Context, records []model.Record) error } type Service struct { config config.Config deviceAliases map[string]string aliasRecords []model.Record snapshots map[string]*deviceSnapshot dirtyDevices map[string]struct{} influxClient writer input chan model.RawMessage received atomic.Uint64 parsed atomic.Uint64 written atomic.Uint64 dropped atomic.Uint64 failed atomic.Uint64 } type Snapshot struct { Received uint64 `json:"received"` Parsed uint64 `json:"parsed"` Written uint64 `json:"written"` Dropped uint64 `json:"dropped"` Failed uint64 `json:"failed"` } type deviceSnapshot struct { Device string Alias string LastSeen time.Time StateSeen time.Time SensorSeen time.Time Fields map[string]any } func NewService(cfg config.Config, influxClient writer) *Service { normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases) return &Service{ config: cfg, deviceAliases: normalizedAliases, aliasRecords: buildAliasRecords(normalizedAliases), snapshots: make(map[string]*deviceSnapshot), dirtyDevices: make(map[string]struct{}), influxClient: influxClient, input: make(chan model.RawMessage, cfg.App.BufferSize), } } func (service *Service) Enqueue(message model.RawMessage) { service.received.Add(1) select { case service.input <- message: default: service.dropped.Add(1) slog.Warn("dropping message because buffer is full", "topic", message.Topic) } } func (service *Service) Run(ctx context.Context) error { ticker := time.NewTicker(service.config.App.FlushInterval.Duration) defer ticker.Stop() batch := append(make([]model.Record, 0, service.config.App.BatchSize+len(service.aliasRecords)), service.aliasRecords...) if len(batch) > 0 { flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) err := service.flush(flushCtx, batch) cancel() if err != nil { slog.Error("failed to flush alias metadata to influx; will retry on next interval", "count", len(batch), "error", err) } else { batch = batch[:0] } } var input <-chan model.RawMessage = service.input for { if len(batch) >= service.config.App.BatchSize { input = nil } else { input = service.input } select { case <-ctx.Done(): flushCtx, cancel := context.WithTimeout(context.Background(), service.config.App.FlushTimeout.Duration) err := service.flushPending(flushCtx, batch) cancel() if err != nil { return err } service.logCounters() return nil case message := <-input: records, err := parser.ParseTasmota(message) if err != nil { service.failed.Add(1) slog.Warn("failed to parse message", "topic", message.Topic, "error", err) continue } service.applyDeviceAliases(records) service.updateDeviceSnapshots(records) service.parsed.Add(uint64(len(records))) batch = append(batch, records...) if len(batch) >= service.config.App.BatchSize { flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) err := service.flushPending(flushCtx, batch) cancel() if err != nil { slog.Error("failed to flush full batch to influx; keeping batch in memory", "count", len(batch), "error", err) continue } batch = batch[:0] } case <-ticker.C: if len(batch) == 0 && len(service.dirtyDevices) == 0 { continue } flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) err := service.flushPending(flushCtx, batch) cancel() if err != nil { slog.Error("failed to flush batch to influx; will retry on next interval", "count", len(batch), "error", err) continue } batch = batch[:0] } } } func (service *Service) flushPending(ctx context.Context, batch []model.Record) error { flushBatch := append([]model.Record(nil), batch...) flushBatch = append(flushBatch, service.buildSnapshotRecords()...) if err := service.flush(ctx, flushBatch); err != nil { return err } service.clearDirtyDevices() return nil } func (service *Service) flush(ctx context.Context, batch []model.Record) error { if len(batch) == 0 { return nil } if err := service.influxClient.Write(ctx, batch); err != nil { service.failed.Add(uint64(len(batch))) return err } service.written.Add(uint64(len(batch))) slog.Info("flushed records to influx", "count", len(batch)) return nil } func (service *Service) logCounters() { slog.Info("service counters", "received", service.received.Load(), "parsed", service.parsed.Load(), "written", service.written.Load(), "dropped", service.dropped.Load(), "failed", service.failed.Load(), ) } func (service *Service) Snapshot() Snapshot { return Snapshot{ Received: service.received.Load(), Parsed: service.parsed.Load(), Written: service.written.Load(), Dropped: service.dropped.Load(), Failed: service.failed.Load(), } } func (service *Service) applyDeviceAliases(records []model.Record) { if len(service.deviceAliases) == 0 { return } for index := range records { device := normalizeDeviceKey(records[index].Tags["device"]) alias, ok := service.deviceAliases[device] if !ok || alias == "" { continue } records[index].Fields["device_alias"] = alias } } func (service *Service) updateDeviceSnapshots(records []model.Record) { for _, record := range records { device := normalizeDeviceKey(record.Tags["device"]) if device == "" { continue } snapshot, ok := service.snapshots[device] if !ok { snapshot = &deviceSnapshot{ Device: device, Fields: make(map[string]any), } service.snapshots[device] = snapshot } if record.Timestamp.After(snapshot.LastSeen) { snapshot.LastSeen = record.Timestamp } if alias, ok := record.Fields["device_alias"].(string); ok && strings.TrimSpace(alias) != "" { snapshot.Alias = alias snapshot.Fields["device_alias"] = alias } switch record.Tags["message_type"] { case "state": if record.Timestamp.After(snapshot.StateSeen) { snapshot.StateSeen = record.Timestamp copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "wifi_signal", "uptime_sec") } case "sensor": if record.Timestamp.After(snapshot.SensorSeen) { snapshot.SensorSeen = record.Timestamp copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "energy_power", "energy_today", "energy_total") } } service.dirtyDevices[device] = struct{}{} } } func copySnapshotFields(target map[string]any, source map[string]any, keys ...string) { for _, key := range keys { value, ok := source[key] if !ok { continue } target[key] = value } } func (service *Service) buildSnapshotRecords() []model.Record { if len(service.dirtyDevices) == 0 { return nil } devices := make([]string, 0, len(service.dirtyDevices)) for device := range service.dirtyDevices { devices = append(devices, device) } sort.Strings(devices) records := make([]model.Record, 0, len(devices)) for _, device := range devices { snapshot := service.snapshots[device] if snapshot == nil || snapshot.LastSeen.IsZero() { continue } fields := make(map[string]any, len(snapshot.Fields)+2) for key, value := range snapshot.Fields { fields[key] = value } if !snapshot.StateSeen.IsZero() { fields["state_last_seen_unix"] = snapshot.StateSeen.Unix() } if !snapshot.SensorSeen.IsZero() { fields["sensor_last_seen_unix"] = snapshot.SensorSeen.Unix() } records = append(records, model.Record{ Measurement: "tasmota_device_snapshot", Tags: map[string]string{ "device": device, "source": "derived", }, Fields: fields, Timestamp: snapshot.LastSeen, }) } return records } func (service *Service) clearDirtyDevices() { for device := range service.dirtyDevices { delete(service.dirtyDevices, device) } } func normalizeDeviceAliases(aliases map[string]string) map[string]string { if len(aliases) == 0 { return nil } normalized := make(map[string]string, len(aliases)) for device, alias := range aliases { cleanDevice := normalizeDeviceKey(device) cleanAlias := strings.TrimSpace(alias) if cleanDevice == "" || cleanAlias == "" { continue } normalized[cleanDevice] = cleanAlias } if len(normalized) == 0 { return nil } return normalized } func normalizeDeviceKey(value string) string { normalized := strings.ToLower(strings.TrimSpace(value)) normalized = strings.ReplaceAll(normalized, "-", "_") normalized = strings.ReplaceAll(normalized, " ", "_") normalized = invalidDeviceAliasCharacters.ReplaceAllString(normalized, "_") normalized = strings.Trim(normalized, "_") return normalized } func buildAliasRecords(aliases map[string]string) []model.Record { if len(aliases) == 0 { return nil } devices := make([]string, 0, len(aliases)) for device := range aliases { devices = append(devices, device) } sort.Strings(devices) timestamp := time.Now().UTC() records := make([]model.Record, 0, len(devices)) for _, device := range devices { records = append(records, model.Record{ Measurement: "tasmota_device_meta", Tags: map[string]string{ "device": device, "source": "config", }, Fields: map[string]any{ "device_alias": aliases[device], }, Timestamp: timestamp, }) } return records }