Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b9789430a1 | |||
| 7a8a1cc855 | |||
| 18ba65f3af | |||
| b0784faa43 | |||
| e9da0920c6 | |||
| 110388c363 |
@@ -52,6 +52,7 @@ Supported environment variables:
|
||||
- `MQTT_SCRUBBER_INFLUX_DATABASE`
|
||||
- `MQTT_SCRUBBER_INFLUX_TOKEN`
|
||||
- `MQTT_SCRUBBER_INFLUX_PRECISION`
|
||||
- `MQTT_SCRUBBER_DEVICE_ALIASES` as a JSON object such as `{"kitchen_plug":"Kitchen Plug"}`
|
||||
- `MQTT_SCRUBBER_APP_BATCH_SIZE`
|
||||
- `MQTT_SCRUBBER_APP_BUFFER_SIZE`
|
||||
- `MQTT_SCRUBBER_APP_FLUSH_INTERVAL`
|
||||
@@ -61,6 +62,8 @@ Supported environment variables:
|
||||
|
||||
`MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list.
|
||||
|
||||
You can also define optional per-device aliases in config with a top-level `device_aliases` object. Keys are normalized like device tags, so `kitchen-plug`, `Kitchen Plug`, and `kitchen_plug` all resolve to `kitchen_plug`.
|
||||
|
||||
## Run
|
||||
|
||||
```bash
|
||||
@@ -135,6 +138,8 @@ Relay panels use the latest `power`, `power1`, `power2`, `power3`, and `power4`
|
||||
|
||||
The dashboard is split into a fleet summary section and a selected-device section. The selected-device section also includes `Last Seen`, `Seconds Since Last Message`, and `Messages In Range` panels derived from both `tasmota_state` and `tasmota_sensor` timestamps.
|
||||
|
||||
If `device_aliases` is configured, the summary table will expose an `alias` column populated from the latest state or sensor record for each device.
|
||||
|
||||
## Notes
|
||||
|
||||
- The repo name is kept as `mqqt-scrubber` to match the existing folder.
|
||||
|
||||
@@ -16,6 +16,10 @@
|
||||
"token": "",
|
||||
"precision": "ns"
|
||||
},
|
||||
"device_aliases": {
|
||||
"tasmota_c88994": "Office Plug",
|
||||
"kitchen-plug": "Kitchen Counter Plug"
|
||||
},
|
||||
"app": {
|
||||
"batch_size": 200,
|
||||
"buffer_size": 1000,
|
||||
|
||||
@@ -125,6 +125,7 @@ timestamp: payload Time or message receive time
|
||||
- `LWT`:
|
||||
- `state` as string
|
||||
- `online` as boolean
|
||||
- all measurements may also include `device_alias` as an optional string when configured externally
|
||||
- `STATE`:
|
||||
- base fields like `uptime`, `uptime_sec`, `heap`, `sleep`, `sleep_mode`, `load_avg`, `mqtt_count`
|
||||
- relay state fields like `power`, `power1` through `power4`
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"description": "Fleet overview of known Tasmota devices with drilldown links into a dedicated per-device dashboard.",
|
||||
"description": "Fleet overview of Tasmota devices using the latest derived per-device snapshot, with drilldown links into a dedicated per-device dashboard.",
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 0,
|
||||
@@ -89,13 +89,13 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"query": "SELECT count(DISTINCT device) AS total_devices FROM (SELECT device FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device FROM tasmota_sensor WHERE $__timeFilter(time))",
|
||||
"query": "SELECT count(*) AS total_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time)",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT count(DISTINCT device) AS total_devices FROM (SELECT device FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device FROM tasmota_sensor WHERE $__timeFilter(time))",
|
||||
"rawSql": "SELECT count(*) AS total_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time)",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Total Devices",
|
||||
"title": "Tracked Devices",
|
||||
"type": "stat"
|
||||
},
|
||||
{
|
||||
@@ -152,13 +152,13 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"query": "SELECT count(DISTINCT device) AS power_devices FROM tasmota_sensor WHERE $__timeFilter(time) AND (energy_power IS NOT NULL OR energy_today IS NOT NULL OR energy_total IS NOT NULL)",
|
||||
"query": "SELECT count(*) AS power_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time WHERE s.energy_power IS NOT NULL OR s.energy_today IS NOT NULL OR s.energy_total IS NOT NULL)",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT count(DISTINCT device) AS power_devices FROM tasmota_sensor WHERE $__timeFilter(time) AND (energy_power IS NOT NULL OR energy_today IS NOT NULL OR energy_total IS NOT NULL)",
|
||||
"rawSql": "SELECT count(*) AS power_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time WHERE s.energy_power IS NOT NULL OR s.energy_today IS NOT NULL OR s.energy_total IS NOT NULL)",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Devices With Energy Metrics",
|
||||
"title": "Energy Devices",
|
||||
"type": "stat"
|
||||
},
|
||||
{
|
||||
@@ -223,9 +223,9 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"query": "SELECT coalesce(sum(sensor.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
||||
"query": "SELECT coalesce(sum(snapshots.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT coalesce(sum(sensor.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
||||
"rawSql": "SELECT coalesce(sum(snapshots.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
@@ -286,9 +286,9 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"query": "SELECT coalesce(sum(sensor.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
||||
"query": "SELECT coalesce(sum(snapshots.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT coalesce(sum(sensor.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
||||
"rawSql": "SELECT coalesce(sum(snapshots.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
@@ -349,9 +349,9 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"query": "SELECT coalesce(sum(sensor.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
||||
"query": "SELECT coalesce(sum(snapshots.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT coalesce(sum(sensor.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
||||
"rawSql": "SELECT coalesce(sum(snapshots.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
@@ -419,6 +419,18 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byName",
|
||||
"options": "alias"
|
||||
},
|
||||
"properties": [
|
||||
{
|
||||
"id": "custom.width",
|
||||
"value": 180
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": {
|
||||
"id": "byName",
|
||||
@@ -567,9 +579,9 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"format": "table",
|
||||
"query": "SELECT devices.device, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device",
|
||||
"query": "SELECT snapshots.device, coalesce(meta.device_alias, snapshots.device_alias, '') AS alias, snapshots.last_seen, to_unixtime(now()) - to_unixtime(snapshots.last_seen) AS age_s, snapshots.energy_power AS current_draw_w, snapshots.energy_today AS daily_draw_kwh, snapshots.energy_total AS total_draw_kwh, snapshots.wifi_signal AS wifi_signal_dbm, snapshots.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT s.device, s.time AS last_seen, s.device_alias, s.energy_power, s.energy_today, s.energy_total, s.wifi_signal, s.uptime_sec FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON snapshots.device = meta.device ORDER BY snapshots.device",
|
||||
"rawQuery": true,
|
||||
"rawSql": "SELECT devices.device, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device",
|
||||
"rawSql": "SELECT snapshots.device, coalesce(meta.device_alias, snapshots.device_alias, '') AS alias, snapshots.last_seen, to_unixtime(now()) - to_unixtime(snapshots.last_seen) AS age_s, snapshots.energy_power AS current_draw_w, snapshots.energy_today AS daily_draw_kwh, snapshots.energy_total AS total_draw_kwh, snapshots.wifi_signal AS wifi_signal_dbm, snapshots.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT s.device, s.time AS last_seen, s.device_alias, s.energy_power, s.energy_today, s.energy_total, s.wifi_signal, s.uptime_sec FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON snapshots.device = meta.device ORDER BY snapshots.device",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
@@ -586,36 +598,7 @@
|
||||
"influxdb"
|
||||
],
|
||||
"templating": {
|
||||
"list": [
|
||||
{
|
||||
"allValue": "*",
|
||||
"current": {
|
||||
"selected": true,
|
||||
"text": "All",
|
||||
"value": "*"
|
||||
},
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "dfftuvrrhv6kgb"
|
||||
},
|
||||
"definition": "SELECT device FROM (SELECT DISTINCT device FROM tasmota_state UNION SELECT DISTINCT device FROM tasmota_sensor) ORDER BY device",
|
||||
"hide": 0,
|
||||
"includeAll": true,
|
||||
"label": "Device",
|
||||
"multi": false,
|
||||
"name": "device",
|
||||
"options": [],
|
||||
"query": {
|
||||
"query": "SELECT device FROM (SELECT DISTINCT device FROM tasmota_state UNION SELECT DISTINCT device FROM tasmota_sensor) ORDER BY device",
|
||||
"refId": "InfluxDBVariableQueryEditor-VariableQuery"
|
||||
},
|
||||
"refresh": 1,
|
||||
"regex": "",
|
||||
"skipUrlSync": false,
|
||||
"sort": 1,
|
||||
"type": "query"
|
||||
}
|
||||
]
|
||||
"list": []
|
||||
},
|
||||
"time": {
|
||||
"from": "now-24h",
|
||||
@@ -625,6 +608,6 @@
|
||||
"timezone": "browser",
|
||||
"title": "Tasmota Device Summary",
|
||||
"uid": "tasmota-device-summary",
|
||||
"version": 6,
|
||||
"version": 9,
|
||||
"weekStart": ""
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -12,10 +13,13 @@ import (
|
||||
|
||||
const envPrefix = "MQTT_SCRUBBER_"
|
||||
|
||||
var invalidDeviceCharacters = regexp.MustCompile(`[^a-z0-9_]+`)
|
||||
|
||||
type Config struct {
|
||||
MQTT MQTTConfig `json:"mqtt"`
|
||||
Influx InfluxConfig `json:"influx"`
|
||||
App AppConfig `json:"app"`
|
||||
DeviceAliases map[string]string `json:"device_aliases"`
|
||||
}
|
||||
|
||||
type MQTTConfig struct {
|
||||
@@ -80,6 +84,8 @@ func Load(path string) (Config, error) {
|
||||
return Config{}, err
|
||||
}
|
||||
|
||||
cfg.DeviceAliases = normalizeDeviceAliases(cfg.DeviceAliases)
|
||||
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
@@ -161,6 +167,18 @@ func applyEnvOverrides(cfg *Config) error {
|
||||
setString(&cfg.App.LogLevel, envPrefix+"APP_LOG_LEVEL")
|
||||
setString(&cfg.App.HealthAddress, envPrefix+"APP_HEALTH_ADDRESS")
|
||||
|
||||
if raw, ok := os.LookupEnv(envPrefix + "DEVICE_ALIASES"); ok {
|
||||
if strings.TrimSpace(raw) == "" {
|
||||
cfg.DeviceAliases = nil
|
||||
} else {
|
||||
aliases := make(map[string]string)
|
||||
if err := json.Unmarshal([]byte(raw), &aliases); err != nil {
|
||||
return fmt.Errorf("parse %sDEVICE_ALIASES: %w", envPrefix, err)
|
||||
}
|
||||
cfg.DeviceAliases = aliases
|
||||
}
|
||||
}
|
||||
|
||||
if raw, ok := os.LookupEnv(envPrefix + "MQTT_TOPICS"); ok {
|
||||
cfg.MQTT.Topics = splitAndTrim(raw)
|
||||
}
|
||||
@@ -227,3 +245,34 @@ func splitAndTrim(value string) []string {
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func normalizeDeviceAliases(aliases map[string]string) map[string]string {
|
||||
if len(aliases) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
normalized := make(map[string]string, len(aliases))
|
||||
for device, alias := range aliases {
|
||||
cleanDevice := normalizeDeviceKey(device)
|
||||
cleanAlias := strings.TrimSpace(alias)
|
||||
if cleanDevice == "" || cleanAlias == "" {
|
||||
continue
|
||||
}
|
||||
normalized[cleanDevice] = cleanAlias
|
||||
}
|
||||
|
||||
if len(normalized) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return normalized
|
||||
}
|
||||
|
||||
func normalizeDeviceKey(value string) string {
|
||||
normalized := strings.ToLower(strings.TrimSpace(value))
|
||||
normalized = strings.ReplaceAll(normalized, "-", "_")
|
||||
normalized = strings.ReplaceAll(normalized, " ", "_")
|
||||
normalized = invalidDeviceCharacters.ReplaceAllString(normalized, "_")
|
||||
normalized = strings.Trim(normalized, "_")
|
||||
return normalized
|
||||
}
|
||||
|
||||
@@ -0,0 +1,107 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestLoadNormalizesDeviceAliases(t *testing.T) {
|
||||
t.Setenv("MQTT_SCRUBBER_DEVICE_ALIASES", "")
|
||||
|
||||
configPath := filepath.Join(t.TempDir(), "config.json")
|
||||
contents := `{
|
||||
"mqtt": {
|
||||
"broker": "tcp://127.0.0.1:1883",
|
||||
"client_id": "mqqt-scrubber",
|
||||
"topics": ["tele/+/STATE"],
|
||||
"qos": 0
|
||||
},
|
||||
"influx": {
|
||||
"url": "http://127.0.0.1:8181",
|
||||
"database": "home",
|
||||
"precision": "ns"
|
||||
},
|
||||
"app": {
|
||||
"batch_size": 200,
|
||||
"buffer_size": 1000,
|
||||
"flush_interval": "10s",
|
||||
"flush_timeout": "10s",
|
||||
"log_level": "info",
|
||||
"health_address": ":8080"
|
||||
},
|
||||
"device_aliases": {
|
||||
"Kitchen-Plug": "Kitchen Plug",
|
||||
" Patio Sensor ": "Patio Sensor",
|
||||
"unused": " "
|
||||
}
|
||||
}`
|
||||
|
||||
if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil {
|
||||
t.Fatalf("write config file: %v", err)
|
||||
}
|
||||
|
||||
cfg, err := Load(configPath)
|
||||
if err != nil {
|
||||
t.Fatalf("Load returned error: %v", err)
|
||||
}
|
||||
|
||||
if got := cfg.DeviceAliases["kitchen_plug"]; got != "Kitchen Plug" {
|
||||
t.Fatalf("unexpected kitchen alias: got %q", got)
|
||||
}
|
||||
|
||||
if got := cfg.DeviceAliases["patio_sensor"]; got != "Patio Sensor" {
|
||||
t.Fatalf("unexpected patio alias: got %q", got)
|
||||
}
|
||||
|
||||
if _, exists := cfg.DeviceAliases["unused"]; exists {
|
||||
t.Fatal("expected blank aliases to be discarded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadOverridesDeviceAliasesFromEnv(t *testing.T) {
|
||||
t.Setenv("MQTT_SCRUBBER_DEVICE_ALIASES", `{"Desk-Plug":"Desk Plug"}`)
|
||||
|
||||
configPath := filepath.Join(t.TempDir(), "config.json")
|
||||
contents := `{
|
||||
"mqtt": {
|
||||
"broker": "tcp://127.0.0.1:1883",
|
||||
"client_id": "mqqt-scrubber",
|
||||
"topics": ["tele/+/STATE"],
|
||||
"qos": 0
|
||||
},
|
||||
"influx": {
|
||||
"url": "http://127.0.0.1:8181",
|
||||
"database": "home",
|
||||
"precision": "ns"
|
||||
},
|
||||
"app": {
|
||||
"batch_size": 200,
|
||||
"buffer_size": 1000,
|
||||
"flush_interval": "10s",
|
||||
"flush_timeout": "10s",
|
||||
"log_level": "info",
|
||||
"health_address": ":8080"
|
||||
},
|
||||
"device_aliases": {
|
||||
"Kitchen-Plug": "Kitchen Plug"
|
||||
}
|
||||
}`
|
||||
|
||||
if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil {
|
||||
t.Fatalf("write config file: %v", err)
|
||||
}
|
||||
|
||||
cfg, err := Load(configPath)
|
||||
if err != nil {
|
||||
t.Fatalf("Load returned error: %v", err)
|
||||
}
|
||||
|
||||
if len(cfg.DeviceAliases) != 1 {
|
||||
t.Fatalf("expected env aliases to replace file aliases, got %d entries", len(cfg.DeviceAliases))
|
||||
}
|
||||
|
||||
if got := cfg.DeviceAliases["desk_plug"]; got != "Desk Plug" {
|
||||
t.Fatalf("unexpected desk alias: got %q", got)
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,9 @@ package pipeline
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -11,12 +14,18 @@ import (
|
||||
"mqqt-scrubber/internal/parser"
|
||||
)
|
||||
|
||||
var invalidDeviceAliasCharacters = regexp.MustCompile(`[^a-z0-9_]+`)
|
||||
|
||||
type writer interface {
|
||||
Write(ctx context.Context, records []model.Record) error
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
config config.Config
|
||||
deviceAliases map[string]string
|
||||
aliasRecords []model.Record
|
||||
snapshots map[string]*deviceSnapshot
|
||||
dirtyDevices map[string]struct{}
|
||||
influxClient writer
|
||||
input chan model.RawMessage
|
||||
received atomic.Uint64
|
||||
@@ -34,9 +43,23 @@ type Snapshot struct {
|
||||
Failed uint64 `json:"failed"`
|
||||
}
|
||||
|
||||
type deviceSnapshot struct {
|
||||
Device string
|
||||
Alias string
|
||||
LastSeen time.Time
|
||||
StateSeen time.Time
|
||||
SensorSeen time.Time
|
||||
Fields map[string]any
|
||||
}
|
||||
|
||||
func NewService(cfg config.Config, influxClient writer) *Service {
|
||||
normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases)
|
||||
return &Service{
|
||||
config: cfg,
|
||||
deviceAliases: normalizedAliases,
|
||||
aliasRecords: buildAliasRecords(normalizedAliases),
|
||||
snapshots: make(map[string]*deviceSnapshot),
|
||||
dirtyDevices: make(map[string]struct{}),
|
||||
influxClient: influxClient,
|
||||
input: make(chan model.RawMessage, cfg.App.BufferSize),
|
||||
}
|
||||
@@ -57,7 +80,18 @@ func (service *Service) Run(ctx context.Context) error {
|
||||
ticker := time.NewTicker(service.config.App.FlushInterval.Duration)
|
||||
defer ticker.Stop()
|
||||
|
||||
batch := make([]model.Record, 0, service.config.App.BatchSize)
|
||||
batch := append(make([]model.Record, 0, service.config.App.BatchSize+len(service.aliasRecords)), service.aliasRecords...)
|
||||
if len(batch) > 0 {
|
||||
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
||||
err := service.flush(flushCtx, batch)
|
||||
cancel()
|
||||
if err != nil {
|
||||
slog.Error("failed to flush alias metadata to influx; will retry on next interval", "count", len(batch), "error", err)
|
||||
} else {
|
||||
batch = batch[:0]
|
||||
}
|
||||
}
|
||||
|
||||
var input <-chan model.RawMessage = service.input
|
||||
|
||||
for {
|
||||
@@ -70,7 +104,7 @@ func (service *Service) Run(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
flushCtx, cancel := context.WithTimeout(context.Background(), service.config.App.FlushTimeout.Duration)
|
||||
err := service.flush(flushCtx, batch)
|
||||
err := service.flushPending(flushCtx, batch)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -86,12 +120,15 @@ func (service *Service) Run(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
service.applyDeviceAliases(records)
|
||||
service.updateDeviceSnapshots(records)
|
||||
|
||||
service.parsed.Add(uint64(len(records)))
|
||||
batch = append(batch, records...)
|
||||
|
||||
if len(batch) >= service.config.App.BatchSize {
|
||||
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
||||
err := service.flush(flushCtx, batch)
|
||||
err := service.flushPending(flushCtx, batch)
|
||||
cancel()
|
||||
if err != nil {
|
||||
slog.Error("failed to flush full batch to influx; keeping batch in memory", "count", len(batch), "error", err)
|
||||
@@ -100,12 +137,12 @@ func (service *Service) Run(ctx context.Context) error {
|
||||
batch = batch[:0]
|
||||
}
|
||||
case <-ticker.C:
|
||||
if len(batch) == 0 {
|
||||
if len(batch) == 0 && len(service.dirtyDevices) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
||||
err := service.flush(flushCtx, batch)
|
||||
err := service.flushPending(flushCtx, batch)
|
||||
cancel()
|
||||
if err != nil {
|
||||
slog.Error("failed to flush batch to influx; will retry on next interval", "count", len(batch), "error", err)
|
||||
@@ -116,6 +153,18 @@ func (service *Service) Run(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (service *Service) flushPending(ctx context.Context, batch []model.Record) error {
|
||||
flushBatch := append([]model.Record(nil), batch...)
|
||||
flushBatch = append(flushBatch, service.buildSnapshotRecords()...)
|
||||
|
||||
if err := service.flush(ctx, flushBatch); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service.clearDirtyDevices()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (service *Service) flush(ctx context.Context, batch []model.Record) error {
|
||||
if len(batch) == 0 {
|
||||
return nil
|
||||
@@ -150,3 +199,180 @@ func (service *Service) Snapshot() Snapshot {
|
||||
Failed: service.failed.Load(),
|
||||
}
|
||||
}
|
||||
|
||||
func (service *Service) applyDeviceAliases(records []model.Record) {
|
||||
if len(service.deviceAliases) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for index := range records {
|
||||
device := normalizeDeviceKey(records[index].Tags["device"])
|
||||
alias, ok := service.deviceAliases[device]
|
||||
if !ok || alias == "" {
|
||||
continue
|
||||
}
|
||||
records[index].Fields["device_alias"] = alias
|
||||
}
|
||||
}
|
||||
|
||||
func (service *Service) updateDeviceSnapshots(records []model.Record) {
|
||||
for _, record := range records {
|
||||
device := normalizeDeviceKey(record.Tags["device"])
|
||||
if device == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
snapshot, ok := service.snapshots[device]
|
||||
if !ok {
|
||||
snapshot = &deviceSnapshot{
|
||||
Device: device,
|
||||
Fields: make(map[string]any),
|
||||
}
|
||||
service.snapshots[device] = snapshot
|
||||
}
|
||||
|
||||
if record.Timestamp.After(snapshot.LastSeen) {
|
||||
snapshot.LastSeen = record.Timestamp
|
||||
}
|
||||
|
||||
if alias, ok := record.Fields["device_alias"].(string); ok && strings.TrimSpace(alias) != "" {
|
||||
snapshot.Alias = alias
|
||||
snapshot.Fields["device_alias"] = alias
|
||||
}
|
||||
|
||||
switch record.Tags["message_type"] {
|
||||
case "state":
|
||||
if record.Timestamp.After(snapshot.StateSeen) {
|
||||
snapshot.StateSeen = record.Timestamp
|
||||
copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "wifi_signal", "uptime_sec")
|
||||
}
|
||||
case "sensor":
|
||||
if record.Timestamp.After(snapshot.SensorSeen) {
|
||||
snapshot.SensorSeen = record.Timestamp
|
||||
copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "energy_power", "energy_today", "energy_total")
|
||||
}
|
||||
}
|
||||
|
||||
service.dirtyDevices[device] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func copySnapshotFields(target map[string]any, source map[string]any, keys ...string) {
|
||||
for _, key := range keys {
|
||||
value, ok := source[key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
target[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
func (service *Service) buildSnapshotRecords() []model.Record {
|
||||
if len(service.dirtyDevices) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
devices := make([]string, 0, len(service.dirtyDevices))
|
||||
for device := range service.dirtyDevices {
|
||||
devices = append(devices, device)
|
||||
}
|
||||
sort.Strings(devices)
|
||||
|
||||
records := make([]model.Record, 0, len(devices))
|
||||
for _, device := range devices {
|
||||
snapshot := service.snapshots[device]
|
||||
if snapshot == nil || snapshot.LastSeen.IsZero() {
|
||||
continue
|
||||
}
|
||||
|
||||
fields := make(map[string]any, len(snapshot.Fields)+2)
|
||||
for key, value := range snapshot.Fields {
|
||||
fields[key] = value
|
||||
}
|
||||
if !snapshot.StateSeen.IsZero() {
|
||||
fields["state_last_seen_unix"] = snapshot.StateSeen.Unix()
|
||||
}
|
||||
if !snapshot.SensorSeen.IsZero() {
|
||||
fields["sensor_last_seen_unix"] = snapshot.SensorSeen.Unix()
|
||||
}
|
||||
|
||||
records = append(records, model.Record{
|
||||
Measurement: "tasmota_device_snapshot",
|
||||
Tags: map[string]string{
|
||||
"device": device,
|
||||
"source": "derived",
|
||||
},
|
||||
Fields: fields,
|
||||
Timestamp: snapshot.LastSeen,
|
||||
})
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
func (service *Service) clearDirtyDevices() {
|
||||
for device := range service.dirtyDevices {
|
||||
delete(service.dirtyDevices, device)
|
||||
}
|
||||
}
|
||||
|
||||
func normalizeDeviceAliases(aliases map[string]string) map[string]string {
|
||||
if len(aliases) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
normalized := make(map[string]string, len(aliases))
|
||||
for device, alias := range aliases {
|
||||
cleanDevice := normalizeDeviceKey(device)
|
||||
cleanAlias := strings.TrimSpace(alias)
|
||||
if cleanDevice == "" || cleanAlias == "" {
|
||||
continue
|
||||
}
|
||||
normalized[cleanDevice] = cleanAlias
|
||||
}
|
||||
|
||||
if len(normalized) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return normalized
|
||||
}
|
||||
|
||||
func normalizeDeviceKey(value string) string {
|
||||
normalized := strings.ToLower(strings.TrimSpace(value))
|
||||
normalized = strings.ReplaceAll(normalized, "-", "_")
|
||||
normalized = strings.ReplaceAll(normalized, " ", "_")
|
||||
normalized = invalidDeviceAliasCharacters.ReplaceAllString(normalized, "_")
|
||||
normalized = strings.Trim(normalized, "_")
|
||||
return normalized
|
||||
}
|
||||
|
||||
func buildAliasRecords(aliases map[string]string) []model.Record {
|
||||
if len(aliases) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
devices := make([]string, 0, len(aliases))
|
||||
for device := range aliases {
|
||||
devices = append(devices, device)
|
||||
}
|
||||
sort.Strings(devices)
|
||||
|
||||
timestamp := time.Now().UTC()
|
||||
records := make([]model.Record, 0, len(devices))
|
||||
for _, device := range devices {
|
||||
records = append(records, model.Record{
|
||||
Measurement: "tasmota_device_meta",
|
||||
Tags: map[string]string{
|
||||
"device": device,
|
||||
"source": "config",
|
||||
},
|
||||
Fields: map[string]any{
|
||||
"device_alias": aliases[device],
|
||||
},
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
@@ -43,9 +43,36 @@ func (writer *fakeWriter) firstBatch() []model.Record {
|
||||
return writer.batches[0]
|
||||
}
|
||||
|
||||
func (writer *fakeWriter) allBatches() [][]model.Record {
|
||||
writer.mu.Lock()
|
||||
defer writer.mu.Unlock()
|
||||
|
||||
copyBatches := make([][]model.Record, 0, len(writer.batches))
|
||||
for _, batch := range writer.batches {
|
||||
copyBatches = append(copyBatches, append([]model.Record(nil), batch...))
|
||||
}
|
||||
|
||||
return copyBatches
|
||||
}
|
||||
|
||||
func findRecord(records []model.Record, measurement string, device string) *model.Record {
|
||||
for index := range records {
|
||||
record := &records[index]
|
||||
if record.Measurement == measurement && record.Tags["device"] == device {
|
||||
return record
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestServiceFlushesParsedRecords(t *testing.T) {
|
||||
fake := newFakeWriter()
|
||||
service := NewService(config.Config{
|
||||
DeviceAliases: map[string]string{
|
||||
"tasmota-896001": "Garage Plug",
|
||||
"Tasmota C88994": "Office Plug",
|
||||
},
|
||||
App: config.AppConfig{
|
||||
BatchSize: 2,
|
||||
BufferSize: 8,
|
||||
@@ -62,6 +89,12 @@ func TestServiceFlushesParsedRecords(t *testing.T) {
|
||||
errCh <- service.Run(ctx)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-fake.flushed:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for alias metadata flush")
|
||||
}
|
||||
|
||||
service.Enqueue(model.RawMessage{
|
||||
Topic: "tele/tasmota_896001/LWT",
|
||||
Payload: []byte("Online"),
|
||||
@@ -79,9 +112,25 @@ func TestServiceFlushesParsedRecords(t *testing.T) {
|
||||
t.Fatal("timed out waiting for pipeline flush")
|
||||
}
|
||||
|
||||
batch := fake.firstBatch()
|
||||
if len(batch) != 2 {
|
||||
t.Fatalf("expected 2 records in flushed batch, got %d", len(batch))
|
||||
batches := fake.allBatches()
|
||||
if len(batches) != 2 {
|
||||
t.Fatalf("expected 2 flushed batches, got %d", len(batches))
|
||||
}
|
||||
|
||||
metaBatch := batches[0]
|
||||
if len(metaBatch) != 2 {
|
||||
t.Fatalf("expected 2 alias metadata records, got %d", len(metaBatch))
|
||||
}
|
||||
if metaBatch[0].Measurement != "tasmota_device_meta" {
|
||||
t.Fatalf("unexpected metadata measurement: %s", metaBatch[0].Measurement)
|
||||
}
|
||||
if metaBatch[0].Fields["device_alias"] == "" {
|
||||
t.Fatalf("expected metadata batch to include device_alias, got %#v", metaBatch[0].Fields["device_alias"])
|
||||
}
|
||||
|
||||
batch := batches[1]
|
||||
if len(batch) != 4 {
|
||||
t.Fatalf("expected 4 records in flushed batch, got %d", len(batch))
|
||||
}
|
||||
|
||||
if batch[0].Measurement != "tasmota_lwt" {
|
||||
@@ -90,6 +139,9 @@ func TestServiceFlushesParsedRecords(t *testing.T) {
|
||||
if batch[0].Fields["online"] != true {
|
||||
t.Fatalf("unexpected lwt online field: %#v", batch[0].Fields["online"])
|
||||
}
|
||||
if batch[0].Fields["device_alias"] != "Garage Plug" {
|
||||
t.Fatalf("unexpected lwt device_alias field: %#v", batch[0].Fields["device_alias"])
|
||||
}
|
||||
|
||||
if batch[1].Measurement != "tasmota_sensor" {
|
||||
t.Fatalf("unexpected second measurement: %s", batch[1].Measurement)
|
||||
@@ -97,6 +149,37 @@ func TestServiceFlushesParsedRecords(t *testing.T) {
|
||||
if batch[1].Fields["energy_total"] != float64(41.385) {
|
||||
t.Fatalf("unexpected sensor energy_total field: %#v", batch[1].Fields["energy_total"])
|
||||
}
|
||||
if batch[1].Fields["device_alias"] != "Office Plug" {
|
||||
t.Fatalf("unexpected sensor device_alias field: %#v", batch[1].Fields["device_alias"])
|
||||
}
|
||||
|
||||
garageSnapshot := findRecord(batch, "tasmota_device_snapshot", "tasmota_896001")
|
||||
if garageSnapshot == nil {
|
||||
t.Fatal("expected garage snapshot record in flushed batch")
|
||||
}
|
||||
if garageSnapshot.Fields["device_alias"] != "Garage Plug" {
|
||||
t.Fatalf("unexpected garage snapshot alias: %#v", garageSnapshot.Fields["device_alias"])
|
||||
}
|
||||
if _, ok := garageSnapshot.Fields["sensor_last_seen_unix"]; ok {
|
||||
t.Fatalf("did not expect sensor timestamp on lwt-only snapshot: %#v", garageSnapshot.Fields["sensor_last_seen_unix"])
|
||||
}
|
||||
|
||||
officeSnapshot := findRecord(batch, "tasmota_device_snapshot", "tasmota_c88994")
|
||||
if officeSnapshot == nil {
|
||||
t.Fatal("expected office snapshot record in flushed batch")
|
||||
}
|
||||
if officeSnapshot.Fields["device_alias"] != "Office Plug" {
|
||||
t.Fatalf("unexpected office snapshot alias: %#v", officeSnapshot.Fields["device_alias"])
|
||||
}
|
||||
if officeSnapshot.Fields["energy_total"] != float64(41.385) {
|
||||
t.Fatalf("unexpected office snapshot energy_total: %#v", officeSnapshot.Fields["energy_total"])
|
||||
}
|
||||
if officeSnapshot.Fields["energy_power"] != float64(1) {
|
||||
t.Fatalf("unexpected office snapshot energy_power: %#v", officeSnapshot.Fields["energy_power"])
|
||||
}
|
||||
if officeSnapshot.Fields["sensor_last_seen_unix"] != time.Date(2026, time.March, 12, 16, 23, 13, 0, time.UTC).Unix() {
|
||||
t.Fatalf("unexpected office snapshot sensor timestamp: %#v", officeSnapshot.Fields["sensor_last_seen_unix"])
|
||||
}
|
||||
|
||||
cancel()
|
||||
select {
|
||||
|
||||
Reference in New Issue
Block a user