diff --git a/README.md b/README.md index 79e730c..f5c18da 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ Supported environment variables: - `MQTT_SCRUBBER_INFLUX_DATABASE` - `MQTT_SCRUBBER_INFLUX_TOKEN` - `MQTT_SCRUBBER_INFLUX_PRECISION` +- `MQTT_SCRUBBER_DEVICE_ALIASES` as a JSON object such as `{"kitchen_plug":"Kitchen Plug"}` - `MQTT_SCRUBBER_APP_BATCH_SIZE` - `MQTT_SCRUBBER_APP_BUFFER_SIZE` - `MQTT_SCRUBBER_APP_FLUSH_INTERVAL` @@ -61,6 +62,8 @@ Supported environment variables: `MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list. +You can also define optional per-device aliases in config with a top-level `device_aliases` object. Keys are normalized like device tags, so `kitchen-plug`, `Kitchen Plug`, and `kitchen_plug` all resolve to `kitchen_plug`. + ## Run ```bash @@ -135,6 +138,8 @@ Relay panels use the latest `power`, `power1`, `power2`, `power3`, and `power4` The dashboard is split into a fleet summary section and a selected-device section. The selected-device section also includes `Last Seen`, `Seconds Since Last Message`, and `Messages In Range` panels derived from both `tasmota_state` and `tasmota_sensor` timestamps. +If `device_aliases` is configured, the summary table will expose an `alias` column populated from the latest state or sensor record for each device. + ## Notes - The repo name is kept as `mqqt-scrubber` to match the existing folder. diff --git a/config.example.json b/config.example.json index 749419e..f76280f 100644 --- a/config.example.json +++ b/config.example.json @@ -16,6 +16,10 @@ "token": "", "precision": "ns" }, + "device_aliases": { + "tasmota_c88994": "Office Plug", + "kitchen-plug": "Kitchen Counter Plug" + }, "app": { "batch_size": 200, "buffer_size": 1000, diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 99468bf..bc75756 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -125,6 +125,7 @@ timestamp: payload Time or message receive time - `LWT`: - `state` as string - `online` as boolean +- all measurements may also include `device_alias` as an optional string when configured externally - `STATE`: - base fields like `uptime`, `uptime_sec`, `heap`, `sleep`, `sleep_mode`, `load_avg`, `mqtt_count` - relay state fields like `power`, `power1` through `power4` diff --git a/docs/grafana/tasmota-device-summary-dashboard.json b/docs/grafana/tasmota-device-summary-dashboard.json index 0e3d61f..3cfc2c1 100644 --- a/docs/grafana/tasmota-device-summary-dashboard.json +++ b/docs/grafana/tasmota-device-summary-dashboard.json @@ -419,6 +419,18 @@ } ] }, + { + "matcher": { + "id": "byName", + "options": "alias" + }, + "properties": [ + { + "id": "custom.width", + "value": 180 + } + ] + }, { "matcher": { "id": "byName", @@ -567,9 +579,9 @@ }, "editorMode": "code", "format": "table", - "query": "SELECT devices.device, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", + "query": "SELECT devices.device, coalesce(state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", "rawQuery": true, - "rawSql": "SELECT devices.device, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", + "rawSql": "SELECT devices.device, coalesce(state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", "refId": "A" } ], @@ -625,6 +637,6 @@ "timezone": "browser", "title": "Tasmota Device Summary", "uid": "tasmota-device-summary", - "version": 6, + "version": 7, "weekStart": "" } \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go index d54d211..da1f741 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "regexp" "strconv" "strings" "time" @@ -12,10 +13,13 @@ import ( const envPrefix = "MQTT_SCRUBBER_" +var invalidDeviceCharacters = regexp.MustCompile(`[^a-z0-9_]+`) + type Config struct { - MQTT MQTTConfig `json:"mqtt"` - Influx InfluxConfig `json:"influx"` - App AppConfig `json:"app"` + MQTT MQTTConfig `json:"mqtt"` + Influx InfluxConfig `json:"influx"` + App AppConfig `json:"app"` + DeviceAliases map[string]string `json:"device_aliases"` } type MQTTConfig struct { @@ -80,6 +84,8 @@ func Load(path string) (Config, error) { return Config{}, err } + cfg.DeviceAliases = normalizeDeviceAliases(cfg.DeviceAliases) + if err := cfg.Validate(); err != nil { return Config{}, err } @@ -161,6 +167,18 @@ func applyEnvOverrides(cfg *Config) error { setString(&cfg.App.LogLevel, envPrefix+"APP_LOG_LEVEL") setString(&cfg.App.HealthAddress, envPrefix+"APP_HEALTH_ADDRESS") + if raw, ok := os.LookupEnv(envPrefix + "DEVICE_ALIASES"); ok { + if strings.TrimSpace(raw) == "" { + cfg.DeviceAliases = nil + } else { + aliases := make(map[string]string) + if err := json.Unmarshal([]byte(raw), &aliases); err != nil { + return fmt.Errorf("parse %sDEVICE_ALIASES: %w", envPrefix, err) + } + cfg.DeviceAliases = aliases + } + } + if raw, ok := os.LookupEnv(envPrefix + "MQTT_TOPICS"); ok { cfg.MQTT.Topics = splitAndTrim(raw) } @@ -227,3 +245,34 @@ func splitAndTrim(value string) []string { return result } + +func normalizeDeviceAliases(aliases map[string]string) map[string]string { + if len(aliases) == 0 { + return nil + } + + normalized := make(map[string]string, len(aliases)) + for device, alias := range aliases { + cleanDevice := normalizeDeviceKey(device) + cleanAlias := strings.TrimSpace(alias) + if cleanDevice == "" || cleanAlias == "" { + continue + } + normalized[cleanDevice] = cleanAlias + } + + if len(normalized) == 0 { + return nil + } + + return normalized +} + +func normalizeDeviceKey(value string) string { + normalized := strings.ToLower(strings.TrimSpace(value)) + normalized = strings.ReplaceAll(normalized, "-", "_") + normalized = strings.ReplaceAll(normalized, " ", "_") + normalized = invalidDeviceCharacters.ReplaceAllString(normalized, "_") + normalized = strings.Trim(normalized, "_") + return normalized +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..ff2a0f4 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,107 @@ +package config + +import ( + "os" + "path/filepath" + "testing" +) + +func TestLoadNormalizesDeviceAliases(t *testing.T) { + t.Setenv("MQTT_SCRUBBER_DEVICE_ALIASES", "") + + configPath := filepath.Join(t.TempDir(), "config.json") + contents := `{ + "mqtt": { + "broker": "tcp://127.0.0.1:1883", + "client_id": "mqqt-scrubber", + "topics": ["tele/+/STATE"], + "qos": 0 + }, + "influx": { + "url": "http://127.0.0.1:8181", + "database": "home", + "precision": "ns" + }, + "app": { + "batch_size": 200, + "buffer_size": 1000, + "flush_interval": "10s", + "flush_timeout": "10s", + "log_level": "info", + "health_address": ":8080" + }, + "device_aliases": { + "Kitchen-Plug": "Kitchen Plug", + " Patio Sensor ": "Patio Sensor", + "unused": " " + } + }` + + if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil { + t.Fatalf("write config file: %v", err) + } + + cfg, err := Load(configPath) + if err != nil { + t.Fatalf("Load returned error: %v", err) + } + + if got := cfg.DeviceAliases["kitchen_plug"]; got != "Kitchen Plug" { + t.Fatalf("unexpected kitchen alias: got %q", got) + } + + if got := cfg.DeviceAliases["patio_sensor"]; got != "Patio Sensor" { + t.Fatalf("unexpected patio alias: got %q", got) + } + + if _, exists := cfg.DeviceAliases["unused"]; exists { + t.Fatal("expected blank aliases to be discarded") + } +} + +func TestLoadOverridesDeviceAliasesFromEnv(t *testing.T) { + t.Setenv("MQTT_SCRUBBER_DEVICE_ALIASES", `{"Desk-Plug":"Desk Plug"}`) + + configPath := filepath.Join(t.TempDir(), "config.json") + contents := `{ + "mqtt": { + "broker": "tcp://127.0.0.1:1883", + "client_id": "mqqt-scrubber", + "topics": ["tele/+/STATE"], + "qos": 0 + }, + "influx": { + "url": "http://127.0.0.1:8181", + "database": "home", + "precision": "ns" + }, + "app": { + "batch_size": 200, + "buffer_size": 1000, + "flush_interval": "10s", + "flush_timeout": "10s", + "log_level": "info", + "health_address": ":8080" + }, + "device_aliases": { + "Kitchen-Plug": "Kitchen Plug" + } + }` + + if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil { + t.Fatalf("write config file: %v", err) + } + + cfg, err := Load(configPath) + if err != nil { + t.Fatalf("Load returned error: %v", err) + } + + if len(cfg.DeviceAliases) != 1 { + t.Fatalf("expected env aliases to replace file aliases, got %d entries", len(cfg.DeviceAliases)) + } + + if got := cfg.DeviceAliases["desk_plug"]; got != "Desk Plug" { + t.Fatalf("unexpected desk alias: got %q", got) + } +} diff --git a/internal/pipeline/service.go b/internal/pipeline/service.go index c1d975e..5fc4b76 100644 --- a/internal/pipeline/service.go +++ b/internal/pipeline/service.go @@ -3,6 +3,8 @@ package pipeline import ( "context" "log/slog" + "regexp" + "strings" "sync/atomic" "time" @@ -11,19 +13,22 @@ import ( "mqqt-scrubber/internal/parser" ) +var invalidDeviceAliasCharacters = regexp.MustCompile(`[^a-z0-9_]+`) + type writer interface { Write(ctx context.Context, records []model.Record) error } type Service struct { - config config.Config - influxClient writer - input chan model.RawMessage - received atomic.Uint64 - parsed atomic.Uint64 - written atomic.Uint64 - dropped atomic.Uint64 - failed atomic.Uint64 + config config.Config + deviceAliases map[string]string + influxClient writer + input chan model.RawMessage + received atomic.Uint64 + parsed atomic.Uint64 + written atomic.Uint64 + dropped atomic.Uint64 + failed atomic.Uint64 } type Snapshot struct { @@ -36,9 +41,10 @@ type Snapshot struct { func NewService(cfg config.Config, influxClient writer) *Service { return &Service{ - config: cfg, - influxClient: influxClient, - input: make(chan model.RawMessage, cfg.App.BufferSize), + config: cfg, + deviceAliases: normalizeDeviceAliases(cfg.DeviceAliases), + influxClient: influxClient, + input: make(chan model.RawMessage, cfg.App.BufferSize), } } @@ -86,6 +92,8 @@ func (service *Service) Run(ctx context.Context) error { continue } + service.applyDeviceAliases(records) + service.parsed.Add(uint64(len(records))) batch = append(batch, records...) @@ -150,3 +158,49 @@ func (service *Service) Snapshot() Snapshot { Failed: service.failed.Load(), } } + +func (service *Service) applyDeviceAliases(records []model.Record) { + if len(service.deviceAliases) == 0 { + return + } + + for index := range records { + device := normalizeDeviceKey(records[index].Tags["device"]) + alias, ok := service.deviceAliases[device] + if !ok || alias == "" { + continue + } + records[index].Fields["device_alias"] = alias + } +} + +func normalizeDeviceAliases(aliases map[string]string) map[string]string { + if len(aliases) == 0 { + return nil + } + + normalized := make(map[string]string, len(aliases)) + for device, alias := range aliases { + cleanDevice := normalizeDeviceKey(device) + cleanAlias := strings.TrimSpace(alias) + if cleanDevice == "" || cleanAlias == "" { + continue + } + normalized[cleanDevice] = cleanAlias + } + + if len(normalized) == 0 { + return nil + } + + return normalized +} + +func normalizeDeviceKey(value string) string { + normalized := strings.ToLower(strings.TrimSpace(value)) + normalized = strings.ReplaceAll(normalized, "-", "_") + normalized = strings.ReplaceAll(normalized, " ", "_") + normalized = invalidDeviceAliasCharacters.ReplaceAllString(normalized, "_") + normalized = strings.Trim(normalized, "_") + return normalized +} diff --git a/internal/pipeline/service_test.go b/internal/pipeline/service_test.go index 7d4a6ab..20c9a2b 100644 --- a/internal/pipeline/service_test.go +++ b/internal/pipeline/service_test.go @@ -46,6 +46,10 @@ func (writer *fakeWriter) firstBatch() []model.Record { func TestServiceFlushesParsedRecords(t *testing.T) { fake := newFakeWriter() service := NewService(config.Config{ + DeviceAliases: map[string]string{ + "tasmota-896001": "Garage Plug", + "Tasmota C88994": "Office Plug", + }, App: config.AppConfig{ BatchSize: 2, BufferSize: 8, @@ -90,6 +94,9 @@ func TestServiceFlushesParsedRecords(t *testing.T) { if batch[0].Fields["online"] != true { t.Fatalf("unexpected lwt online field: %#v", batch[0].Fields["online"]) } + if batch[0].Fields["device_alias"] != "Garage Plug" { + t.Fatalf("unexpected lwt device_alias field: %#v", batch[0].Fields["device_alias"]) + } if batch[1].Measurement != "tasmota_sensor" { t.Fatalf("unexpected second measurement: %s", batch[1].Measurement) @@ -97,6 +104,9 @@ func TestServiceFlushesParsedRecords(t *testing.T) { if batch[1].Fields["energy_total"] != float64(41.385) { t.Fatalf("unexpected sensor energy_total field: %#v", batch[1].Fields["energy_total"]) } + if batch[1].Fields["device_alias"] != "Office Plug" { + t.Fatalf("unexpected sensor device_alias field: %#v", batch[1].Fields["device_alias"]) + } cancel() select {