From 110388c363e5572d6b44fa8e65aef0096a7fa505 Mon Sep 17 00:00:00 2001 From: Michal Maly Date: Sat, 14 Mar 2026 22:42:27 +0100 Subject: [PATCH 1/2] Add configurable device aliases --- README.md | 5 + config.example.json | 4 + docs/ARCHITECTURE.md | 1 + .../tasmota-device-summary-dashboard.json | 18 ++- internal/config/config.go | 55 ++++++++- internal/config/config_test.go | 107 ++++++++++++++++++ internal/pipeline/service.go | 76 +++++++++++-- internal/pipeline/service_test.go | 10 ++ 8 files changed, 259 insertions(+), 17 deletions(-) create mode 100644 internal/config/config_test.go diff --git a/README.md b/README.md index 79e730c..f5c18da 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ Supported environment variables: - `MQTT_SCRUBBER_INFLUX_DATABASE` - `MQTT_SCRUBBER_INFLUX_TOKEN` - `MQTT_SCRUBBER_INFLUX_PRECISION` +- `MQTT_SCRUBBER_DEVICE_ALIASES` as a JSON object such as `{"kitchen_plug":"Kitchen Plug"}` - `MQTT_SCRUBBER_APP_BATCH_SIZE` - `MQTT_SCRUBBER_APP_BUFFER_SIZE` - `MQTT_SCRUBBER_APP_FLUSH_INTERVAL` @@ -61,6 +62,8 @@ Supported environment variables: `MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list. +You can also define optional per-device aliases in config with a top-level `device_aliases` object. Keys are normalized like device tags, so `kitchen-plug`, `Kitchen Plug`, and `kitchen_plug` all resolve to `kitchen_plug`. + ## Run ```bash @@ -135,6 +138,8 @@ Relay panels use the latest `power`, `power1`, `power2`, `power3`, and `power4` The dashboard is split into a fleet summary section and a selected-device section. The selected-device section also includes `Last Seen`, `Seconds Since Last Message`, and `Messages In Range` panels derived from both `tasmota_state` and `tasmota_sensor` timestamps. +If `device_aliases` is configured, the summary table will expose an `alias` column populated from the latest state or sensor record for each device. + ## Notes - The repo name is kept as `mqqt-scrubber` to match the existing folder. diff --git a/config.example.json b/config.example.json index 749419e..f76280f 100644 --- a/config.example.json +++ b/config.example.json @@ -16,6 +16,10 @@ "token": "", "precision": "ns" }, + "device_aliases": { + "tasmota_c88994": "Office Plug", + "kitchen-plug": "Kitchen Counter Plug" + }, "app": { "batch_size": 200, "buffer_size": 1000, diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 99468bf..bc75756 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -125,6 +125,7 @@ timestamp: payload Time or message receive time - `LWT`: - `state` as string - `online` as boolean +- all measurements may also include `device_alias` as an optional string when configured externally - `STATE`: - base fields like `uptime`, `uptime_sec`, `heap`, `sleep`, `sleep_mode`, `load_avg`, `mqtt_count` - relay state fields like `power`, `power1` through `power4` diff --git a/docs/grafana/tasmota-device-summary-dashboard.json b/docs/grafana/tasmota-device-summary-dashboard.json index 0e3d61f..3cfc2c1 100644 --- a/docs/grafana/tasmota-device-summary-dashboard.json +++ b/docs/grafana/tasmota-device-summary-dashboard.json @@ -419,6 +419,18 @@ } ] }, + { + "matcher": { + "id": "byName", + "options": "alias" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, { "matcher": { "id": "byName", @@ -567,9 +579,9 @@ }, "editorMode": "code", "format": "table", - "query": "SELECT devices.device, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", + "query": "SELECT devices.device, coalesce(state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", "rawQuery": true, - "rawSql": "SELECT devices.device, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", + "rawSql": "SELECT devices.device, coalesce(state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", "refId": "A" } ], @@ -625,6 +637,6 @@ "timezone": "browser", "title": "Tasmota Device Summary", "uid": "tasmota-device-summary", - "version": 6, + "version": 7, "weekStart": "" } \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go index d54d211..da1f741 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "regexp" "strconv" "strings" "time" @@ -12,10 +13,13 @@ import ( const envPrefix = "MQTT_SCRUBBER_" +var invalidDeviceCharacters = regexp.MustCompile(`[^a-z0-9_]+`) + type Config struct { - MQTT MQTTConfig `json:"mqtt"` - Influx InfluxConfig `json:"influx"` - App AppConfig `json:"app"` + MQTT MQTTConfig `json:"mqtt"` + Influx InfluxConfig `json:"influx"` + App AppConfig `json:"app"` + DeviceAliases map[string]string `json:"device_aliases"` } type MQTTConfig struct { @@ -80,6 +84,8 @@ func Load(path string) (Config, error) { return Config{}, err } + cfg.DeviceAliases = normalizeDeviceAliases(cfg.DeviceAliases) + if err := cfg.Validate(); err != nil { return Config{}, err } @@ -161,6 +167,18 @@ func applyEnvOverrides(cfg *Config) error { setString(&cfg.App.LogLevel, envPrefix+"APP_LOG_LEVEL") setString(&cfg.App.HealthAddress, envPrefix+"APP_HEALTH_ADDRESS") + if raw, ok := os.LookupEnv(envPrefix + "DEVICE_ALIASES"); ok { + if strings.TrimSpace(raw) == "" { + cfg.DeviceAliases = nil + } else { + aliases := make(map[string]string) + if err := json.Unmarshal([]byte(raw), &aliases); err != nil { + return fmt.Errorf("parse %sDEVICE_ALIASES: %w", envPrefix, err) + } + cfg.DeviceAliases = aliases + } + } + if raw, ok := os.LookupEnv(envPrefix + "MQTT_TOPICS"); ok { cfg.MQTT.Topics = splitAndTrim(raw) } @@ -227,3 +245,34 @@ func splitAndTrim(value string) []string { return result } + +func normalizeDeviceAliases(aliases map[string]string) map[string]string { + if len(aliases) == 0 { + return nil + } + + normalized := make(map[string]string, len(aliases)) + for device, alias := range aliases { + cleanDevice := normalizeDeviceKey(device) + cleanAlias := strings.TrimSpace(alias) + if cleanDevice == "" || cleanAlias == "" { + continue + } + normalized[cleanDevice] = cleanAlias + } + + if len(normalized) == 0 { + return nil + } + + return normalized +} + +func normalizeDeviceKey(value string) string { + normalized := strings.ToLower(strings.TrimSpace(value)) + normalized = strings.ReplaceAll(normalized, "-", "_") + normalized = strings.ReplaceAll(normalized, " ", "_") + normalized = invalidDeviceCharacters.ReplaceAllString(normalized, "_") + normalized = strings.Trim(normalized, "_") + return normalized +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..ff2a0f4 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,107 @@ +package config + +import ( + "os" + "path/filepath" + "testing" +) + +func TestLoadNormalizesDeviceAliases(t *testing.T) { + t.Setenv("MQTT_SCRUBBER_DEVICE_ALIASES", "") + + configPath := filepath.Join(t.TempDir(), "config.json") + contents := `{ + "mqtt": { + "broker": "tcp://127.0.0.1:1883", + "client_id": "mqqt-scrubber", + "topics": ["tele/+/STATE"], + "qos": 0 + }, + "influx": { + "url": "http://127.0.0.1:8181", + "database": "home", + "precision": "ns" + }, + "app": { + "batch_size": 200, + "buffer_size": 1000, + "flush_interval": "10s", + "flush_timeout": "10s", + "log_level": "info", + "health_address": ":8080" + }, + "device_aliases": { + "Kitchen-Plug": "Kitchen Plug", + " Patio Sensor ": "Patio Sensor", + "unused": " " + } + }` + + if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil { + t.Fatalf("write config file: %v", err) + } + + cfg, err := Load(configPath) + if err != nil { + t.Fatalf("Load returned error: %v", err) + } + + if got := cfg.DeviceAliases["kitchen_plug"]; got != "Kitchen Plug" { + t.Fatalf("unexpected kitchen alias: got %q", got) + } + + if got := cfg.DeviceAliases["patio_sensor"]; got != "Patio Sensor" { + t.Fatalf("unexpected patio alias: got %q", got) + } + + if _, exists := cfg.DeviceAliases["unused"]; exists { + t.Fatal("expected blank aliases to be discarded") + } +} + +func TestLoadOverridesDeviceAliasesFromEnv(t *testing.T) { + t.Setenv("MQTT_SCRUBBER_DEVICE_ALIASES", `{"Desk-Plug":"Desk Plug"}`) + + configPath := filepath.Join(t.TempDir(), "config.json") + contents := `{ + "mqtt": { + "broker": "tcp://127.0.0.1:1883", + "client_id": "mqqt-scrubber", + "topics": ["tele/+/STATE"], + "qos": 0 + }, + "influx": { + "url": "http://127.0.0.1:8181", + "database": "home", + "precision": "ns" + }, + "app": { + "batch_size": 200, + "buffer_size": 1000, + "flush_interval": "10s", + "flush_timeout": "10s", + "log_level": "info", + "health_address": ":8080" + }, + "device_aliases": { + "Kitchen-Plug": "Kitchen Plug" + } + }` + + if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil { + t.Fatalf("write config file: %v", err) + } + + cfg, err := Load(configPath) + if err != nil { + t.Fatalf("Load returned error: %v", err) + } + + if len(cfg.DeviceAliases) != 1 { + t.Fatalf("expected env aliases to replace file aliases, got %d entries", len(cfg.DeviceAliases)) + } + + if got := cfg.DeviceAliases["desk_plug"]; got != "Desk Plug" { + t.Fatalf("unexpected desk alias: got %q", got) + } +} diff --git a/internal/pipeline/service.go b/internal/pipeline/service.go index c1d975e..5fc4b76 100644 --- a/internal/pipeline/service.go +++ b/internal/pipeline/service.go @@ -3,6 +3,8 @@ package pipeline import ( "context" "log/slog" + "regexp" + "strings" "sync/atomic" "time" @@ -11,19 +13,22 @@ import ( "mqqt-scrubber/internal/parser" ) +var invalidDeviceAliasCharacters = regexp.MustCompile(`[^a-z0-9_]+`) + type writer interface { Write(ctx context.Context, records []model.Record) error } type Service struct { - config config.Config - influxClient writer - input chan model.RawMessage - received atomic.Uint64 - parsed atomic.Uint64 - written atomic.Uint64 - dropped atomic.Uint64 - failed atomic.Uint64 + config config.Config + deviceAliases map[string]string + influxClient writer + input chan model.RawMessage + received atomic.Uint64 + parsed atomic.Uint64 + written atomic.Uint64 + dropped atomic.Uint64 + failed atomic.Uint64 } type Snapshot struct { @@ -36,9 +41,10 @@ type Snapshot struct { func NewService(cfg config.Config, influxClient writer) *Service { return &Service{ - config: cfg, - influxClient: influxClient, - input: make(chan model.RawMessage, cfg.App.BufferSize), + config: cfg, + deviceAliases: normalizeDeviceAliases(cfg.DeviceAliases), + influxClient: influxClient, + input: make(chan model.RawMessage, cfg.App.BufferSize), } } @@ -86,6 +92,8 @@ func (service *Service) Run(ctx context.Context) error { continue } + service.applyDeviceAliases(records) + service.parsed.Add(uint64(len(records))) batch = append(batch, records...) @@ -150,3 +158,49 @@ func (service *Service) Snapshot() Snapshot { Failed: service.failed.Load(), } } + +func (service *Service) applyDeviceAliases(records []model.Record) { + if len(service.deviceAliases) == 0 { + return + } + + for index := range records { + device := normalizeDeviceKey(records[index].Tags["device"]) + alias, ok := service.deviceAliases[device] + if !ok || alias == "" { + continue + } + records[index].Fields["device_alias"] = alias + } +} + +func normalizeDeviceAliases(aliases map[string]string) map[string]string { + if len(aliases) == 0 { + return nil + } + + normalized := make(map[string]string, len(aliases)) + for device, alias := range aliases { + cleanDevice := normalizeDeviceKey(device) + cleanAlias := strings.TrimSpace(alias) + if cleanDevice == "" || cleanAlias == "" { + continue + } + normalized[cleanDevice] = cleanAlias + } + + if len(normalized) == 0 { + return nil + } + + return normalized +} + +func normalizeDeviceKey(value string) string { + normalized := strings.ToLower(strings.TrimSpace(value)) + normalized = strings.ReplaceAll(normalized, "-", "_") + normalized = strings.ReplaceAll(normalized, " ", "_") + normalized = invalidDeviceAliasCharacters.ReplaceAllString(normalized, "_") + normalized = strings.Trim(normalized, "_") + return normalized +} diff --git a/internal/pipeline/service_test.go b/internal/pipeline/service_test.go index 7d4a6ab..20c9a2b 100644 --- a/internal/pipeline/service_test.go +++ b/internal/pipeline/service_test.go @@ -46,6 +46,10 @@ func (writer *fakeWriter) firstBatch() []model.Record { func TestServiceFlushesParsedRecords(t *testing.T) { fake := newFakeWriter() service := NewService(config.Config{ + DeviceAliases: map[string]string{ + "tasmota-896001": "Garage Plug", + "Tasmota C88994": "Office Plug", + }, App: config.AppConfig{ BatchSize: 2, BufferSize: 8, @@ -90,6 +94,9 @@ func TestServiceFlushesParsedRecords(t *testing.T) { if batch[0].Fields["online"] != true { t.Fatalf("unexpected lwt online field: %#v", batch[0].Fields["online"]) } + if batch[0].Fields["device_alias"] != "Garage Plug" { + t.Fatalf("unexpected lwt device_alias field: %#v", batch[0].Fields["device_alias"]) + } if batch[1].Measurement != "tasmota_sensor" { t.Fatalf("unexpected second measurement: %s", batch[1].Measurement) @@ -97,6 +104,9 @@ func TestServiceFlushesParsedRecords(t *testing.T) { if batch[1].Fields["energy_total"] != float64(41.385) { t.Fatalf("unexpected sensor energy_total field: %#v", batch[1].Fields["energy_total"]) } + if batch[1].Fields["device_alias"] != "Office Plug" { + t.Fatalf("unexpected sensor device_alias field: %#v", batch[1].Fields["device_alias"]) + } cancel() select { -- 2.52.0 From e9da0920c65b2ae76a848916eb3ce475883fa6da Mon Sep 17 00:00:00 2001 From: Michal Maly Date: Sat, 14 Mar 2026 23:01:01 +0100 Subject: [PATCH 2/2] Write device aliases as startup metadata --- .../tasmota-device-summary-dashboard.json | 4 +- internal/pipeline/service.go | 49 ++++++++++++++++++- internal/pipeline/service_test.go | 36 +++++++++++++- 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/docs/grafana/tasmota-device-summary-dashboard.json b/docs/grafana/tasmota-device-summary-dashboard.json index 3cfc2c1..e24edf9 100644 --- a/docs/grafana/tasmota-device-summary-dashboard.json +++ b/docs/grafana/tasmota-device-summary-dashboard.json @@ -579,9 +579,9 @@ }, "editorMode": "code", "format": "table", - "query": "SELECT devices.device, coalesce(state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", + "query": "SELECT devices.device, coalesce(meta.device_alias, state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON devices.device = meta.device LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", "rawQuery": true, - "rawSql": "SELECT devices.device, coalesce(state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", + "rawSql": "SELECT devices.device, coalesce(meta.device_alias, state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON devices.device = meta.device LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", "refId": "A" } ], diff --git a/internal/pipeline/service.go b/internal/pipeline/service.go index 5fc4b76..1cde42d 100644 --- a/internal/pipeline/service.go +++ b/internal/pipeline/service.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "regexp" + "sort" "strings" "sync/atomic" "time" @@ -22,6 +23,7 @@ type writer interface { type Service struct { config config.Config deviceAliases map[string]string + aliasRecords []model.Record influxClient writer input chan model.RawMessage received atomic.Uint64 @@ -40,9 +42,11 @@ type Snapshot struct { } func NewService(cfg config.Config, influxClient writer) *Service { + normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases) return &Service{ config: cfg, - deviceAliases: normalizeDeviceAliases(cfg.DeviceAliases), + deviceAliases: normalizedAliases, + aliasRecords: buildAliasRecords(normalizedAliases), influxClient: influxClient, input: make(chan model.RawMessage, cfg.App.BufferSize), } @@ -63,7 +67,18 @@ func (service *Service) Run(ctx context.Context) error { ticker := time.NewTicker(service.config.App.FlushInterval.Duration) defer ticker.Stop() - batch := make([]model.Record, 0, service.config.App.BatchSize) + batch := append(make([]model.Record, 0, service.config.App.BatchSize+len(service.aliasRecords)), service.aliasRecords...) + if len(batch) > 0 { + flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) + err := service.flush(flushCtx, batch) + cancel() + if err != nil { + slog.Error("failed to flush alias metadata to influx; will retry on next interval", "count", len(batch), "error", err) + } else { + batch = batch[:0] + } + } + var input <-chan model.RawMessage = service.input for { @@ -204,3 +219,33 @@ func normalizeDeviceKey(value string) string { normalized = strings.Trim(normalized, "_") return normalized } + +func buildAliasRecords(aliases map[string]string) []model.Record { + if len(aliases) == 0 { + return nil + } + + devices := make([]string, 0, len(aliases)) + for device := range aliases { + devices = append(devices, device) + } + sort.Strings(devices) + + timestamp := time.Now().UTC() + records := make([]model.Record, 0, len(devices)) + for _, device := range devices { + records = append(records, model.Record{ + Measurement: "tasmota_device_meta", + Tags: map[string]string{ + "device": device, + "source": "config", + }, + Fields: map[string]any{ + "device_alias": aliases[device], + }, + Timestamp: timestamp, + }) + } + + return records +} diff --git a/internal/pipeline/service_test.go b/internal/pipeline/service_test.go index 20c9a2b..3d2bfa0 100644 --- a/internal/pipeline/service_test.go +++ b/internal/pipeline/service_test.go @@ -43,6 +43,18 @@ func (writer *fakeWriter) firstBatch() []model.Record { return writer.batches[0] } +func (writer *fakeWriter) allBatches() [][]model.Record { + writer.mu.Lock() + defer writer.mu.Unlock() + + copyBatches := make([][]model.Record, 0, len(writer.batches)) + for _, batch := range writer.batches { + copyBatches = append(copyBatches, append([]model.Record(nil), batch...)) + } + + return copyBatches +} + func TestServiceFlushesParsedRecords(t *testing.T) { fake := newFakeWriter() service := NewService(config.Config{ @@ -66,6 +78,12 @@ func TestServiceFlushesParsedRecords(t *testing.T) { errCh <- service.Run(ctx) }() + select { + case <-fake.flushed: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for alias metadata flush") + } + service.Enqueue(model.RawMessage{ Topic: "tele/tasmota_896001/LWT", Payload: []byte("Online"), @@ -83,7 +101,23 @@ func TestServiceFlushesParsedRecords(t *testing.T) { t.Fatal("timed out waiting for pipeline flush") } - batch := fake.firstBatch() + batches := fake.allBatches() + if len(batches) != 2 { + t.Fatalf("expected 2 flushed batches, got %d", len(batches)) + } + + metaBatch := batches[0] + if len(metaBatch) != 2 { + t.Fatalf("expected 2 alias metadata records, got %d", len(metaBatch)) + } + if metaBatch[0].Measurement != "tasmota_device_meta" { + t.Fatalf("unexpected metadata measurement: %s", metaBatch[0].Measurement) + } + if metaBatch[0].Fields["device_alias"] == "" { + t.Fatalf("expected metadata batch to include device_alias, got %#v", metaBatch[0].Fields["device_alias"]) + } + + batch := batches[1] if len(batch) != 2 { t.Fatalf("expected 2 records in flushed batch, got %d", len(batch)) } -- 2.52.0