From e9da0920c65b2ae76a848916eb3ce475883fa6da Mon Sep 17 00:00:00 2001 From: Michal Maly Date: Sat, 14 Mar 2026 23:01:01 +0100 Subject: [PATCH] Write device aliases as startup metadata --- .../tasmota-device-summary-dashboard.json | 4 +- internal/pipeline/service.go | 49 ++++++++++++++++++- internal/pipeline/service_test.go | 36 +++++++++++++- 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/docs/grafana/tasmota-device-summary-dashboard.json b/docs/grafana/tasmota-device-summary-dashboard.json index 3cfc2c1..e24edf9 100644 --- a/docs/grafana/tasmota-device-summary-dashboard.json +++ b/docs/grafana/tasmota-device-summary-dashboard.json @@ -579,9 +579,9 @@ }, "editorMode": "code", "format": "table", - "query": "SELECT devices.device, coalesce(state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", + "query": "SELECT devices.device, coalesce(meta.device_alias, state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON devices.device = meta.device LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", "rawQuery": true, - "rawSql": "SELECT devices.device, coalesce(state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", + "rawSql": "SELECT devices.device, coalesce(meta.device_alias, state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON devices.device = meta.device LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", "refId": "A" } ], diff --git a/internal/pipeline/service.go b/internal/pipeline/service.go index 5fc4b76..1cde42d 100644 --- a/internal/pipeline/service.go +++ b/internal/pipeline/service.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "regexp" + "sort" "strings" "sync/atomic" "time" @@ -22,6 +23,7 @@ type writer interface { type Service struct { config config.Config deviceAliases map[string]string + aliasRecords []model.Record influxClient writer input chan model.RawMessage received atomic.Uint64 @@ -40,9 +42,11 @@ type Snapshot struct { } func NewService(cfg config.Config, influxClient writer) *Service { + normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases) return &Service{ config: cfg, - deviceAliases: normalizeDeviceAliases(cfg.DeviceAliases), + deviceAliases: normalizedAliases, + aliasRecords: buildAliasRecords(normalizedAliases), influxClient: influxClient, input: make(chan model.RawMessage, cfg.App.BufferSize), } @@ -63,7 +67,18 @@ func (service *Service) Run(ctx context.Context) error { ticker := time.NewTicker(service.config.App.FlushInterval.Duration) defer ticker.Stop() - batch := make([]model.Record, 0, service.config.App.BatchSize) + batch := append(make([]model.Record, 0, service.config.App.BatchSize+len(service.aliasRecords)), service.aliasRecords...) + if len(batch) > 0 { + flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) + err := service.flush(flushCtx, batch) + cancel() + if err != nil { + slog.Error("failed to flush alias metadata to influx; will retry on next interval", "count", len(batch), "error", err) + } else { + batch = batch[:0] + } + } + var input <-chan model.RawMessage = service.input for { @@ -204,3 +219,33 @@ func normalizeDeviceKey(value string) string { normalized = strings.Trim(normalized, "_") return normalized } + +func buildAliasRecords(aliases map[string]string) []model.Record { + if len(aliases) == 0 { + return nil + } + + devices := make([]string, 0, len(aliases)) + for device := range aliases { + devices = append(devices, device) + } + sort.Strings(devices) + + timestamp := time.Now().UTC() + records := make([]model.Record, 0, len(devices)) + for _, device := range devices { + records = append(records, model.Record{ + Measurement: "tasmota_device_meta", + Tags: map[string]string{ + "device": device, + "source": "config", + }, + Fields: map[string]any{ + "device_alias": aliases[device], + }, + Timestamp: timestamp, + }) + } + + return records +} diff --git a/internal/pipeline/service_test.go b/internal/pipeline/service_test.go index 20c9a2b..3d2bfa0 100644 --- a/internal/pipeline/service_test.go +++ b/internal/pipeline/service_test.go @@ -43,6 +43,18 @@ func (writer *fakeWriter) firstBatch() []model.Record { return writer.batches[0] } +func (writer *fakeWriter) allBatches() [][]model.Record { + writer.mu.Lock() + defer writer.mu.Unlock() + + copyBatches := make([][]model.Record, 0, len(writer.batches)) + for _, batch := range writer.batches { + copyBatches = append(copyBatches, append([]model.Record(nil), batch...)) + } + + return copyBatches +} + func TestServiceFlushesParsedRecords(t *testing.T) { fake := newFakeWriter() service := NewService(config.Config{ @@ -66,6 +78,12 @@ func TestServiceFlushesParsedRecords(t *testing.T) { errCh <- service.Run(ctx) }() + select { + case <-fake.flushed: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for alias metadata flush") + } + service.Enqueue(model.RawMessage{ Topic: "tele/tasmota_896001/LWT", Payload: []byte("Online"), @@ -83,7 +101,23 @@ func TestServiceFlushesParsedRecords(t *testing.T) { t.Fatal("timed out waiting for pipeline flush") } - batch := fake.firstBatch() + batches := fake.allBatches() + if len(batches) != 2 { + t.Fatalf("expected 2 flushed batches, got %d", len(batches)) + } + + metaBatch := batches[0] + if len(metaBatch) != 2 { + t.Fatalf("expected 2 alias metadata records, got %d", len(metaBatch)) + } + if metaBatch[0].Measurement != "tasmota_device_meta" { + t.Fatalf("unexpected metadata measurement: %s", metaBatch[0].Measurement) + } + if metaBatch[0].Fields["device_alias"] == "" { + t.Fatalf("expected metadata batch to include device_alias, got %#v", metaBatch[0].Fields["device_alias"]) + } + + batch := batches[1] if len(batch) != 2 { t.Fatalf("expected 2 records in flushed batch, got %d", len(batch)) }