Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 32b508ac55 | |||
| 585378297c | |||
| 5ac9cc3892 | |||
| b9789430a1 | |||
| 7a8a1cc855 | |||
| b0784faa43 |
@@ -59,9 +59,12 @@ Supported environment variables:
|
|||||||
- `MQTT_SCRUBBER_APP_FLUSH_TIMEOUT`
|
- `MQTT_SCRUBBER_APP_FLUSH_TIMEOUT`
|
||||||
- `MQTT_SCRUBBER_APP_LOG_LEVEL`
|
- `MQTT_SCRUBBER_APP_LOG_LEVEL`
|
||||||
- `MQTT_SCRUBBER_APP_HEALTH_ADDRESS`
|
- `MQTT_SCRUBBER_APP_HEALTH_ADDRESS`
|
||||||
|
- `MQTT_SCRUBBER_APP_TASMOTA_TIME_ZONE`
|
||||||
|
|
||||||
`MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list.
|
`MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list.
|
||||||
|
|
||||||
|
Timezone-less Tasmota `Time` values are interpreted in `tasmota_time_zone`. Set it to your device timezone such as `Europe/Prague` so the stored timestamp matches the device-local clock. The default is `UTC` for deterministic behavior.
|
||||||
|
|
||||||
You can also define optional per-device aliases in config with a top-level `device_aliases` object. Keys are normalized like device tags, so `kitchen-plug`, `Kitchen Plug`, and `kitchen_plug` all resolve to `kitchen_plug`.
|
You can also define optional per-device aliases in config with a top-level `device_aliases` object. Keys are normalized like device tags, so `kitchen-plug`, `Kitchen Plug`, and `kitchen_plug` all resolve to `kitchen_plug`.
|
||||||
|
|
||||||
## Run
|
## Run
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
_ "time/tzdata"
|
||||||
|
|
||||||
"mqqt-scrubber/internal/config"
|
"mqqt-scrubber/internal/config"
|
||||||
"mqqt-scrubber/internal/health"
|
"mqqt-scrubber/internal/health"
|
||||||
|
|||||||
+2
-1
@@ -26,6 +26,7 @@
|
|||||||
"flush_interval": "10s",
|
"flush_interval": "10s",
|
||||||
"flush_timeout": "10s",
|
"flush_timeout": "10s",
|
||||||
"log_level": "info",
|
"log_level": "info",
|
||||||
"health_address": ":8080"
|
"health_address": ":8080",
|
||||||
|
"tasmota_time_zone": "Europe/Prague"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -8,6 +8,7 @@ services:
|
|||||||
- ./config.json:/app/config.json:ro
|
- ./config.json:/app/config.json:ro
|
||||||
environment:
|
environment:
|
||||||
MQTT_SCRUBBER_APP_HEALTH_ADDRESS: ":8080"
|
MQTT_SCRUBBER_APP_HEALTH_ADDRESS: ":8080"
|
||||||
|
MQTT_SCRUBBER_APP_TASMOTA_TIME_ZONE: "Europe/Prague"
|
||||||
networks:
|
networks:
|
||||||
- iot-network
|
- iot-network
|
||||||
healthcheck:
|
healthcheck:
|
||||||
|
|||||||
@@ -140,7 +140,7 @@ timestamp: payload Time or message receive time
|
|||||||
|
|
||||||
- prefer payload `Time` when present
|
- prefer payload `Time` when present
|
||||||
- accept RFC3339 and timezone-less Tasmota timestamps in the form `2006-01-02T15:04:05`
|
- accept RFC3339 and timezone-less Tasmota timestamps in the form `2006-01-02T15:04:05`
|
||||||
- current implementation interprets timezone-less timestamps as UTC
|
- interpret timezone-less timestamps in the configured `tasmota_time_zone` location and convert them to UTC for storage
|
||||||
|
|
||||||
## Failure handling
|
## Failure handling
|
||||||
|
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -15,7 +15,7 @@
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"description": "Fleet overview of known Tasmota devices with drilldown links into a dedicated per-device dashboard.",
|
"description": "Fleet overview of Tasmota devices using the latest derived per-device snapshot, with drilldown links into a dedicated per-device dashboard.",
|
||||||
"editable": true,
|
"editable": true,
|
||||||
"fiscalYearStartMonth": 0,
|
"fiscalYearStartMonth": 0,
|
||||||
"graphTooltip": 0,
|
"graphTooltip": 0,
|
||||||
@@ -89,13 +89,13 @@
|
|||||||
},
|
},
|
||||||
"editorMode": "code",
|
"editorMode": "code",
|
||||||
"format": "table",
|
"format": "table",
|
||||||
"query": "SELECT count(DISTINCT device) AS total_devices FROM (SELECT device FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device FROM tasmota_sensor WHERE $__timeFilter(time))",
|
"query": "SELECT count(*) AS total_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time)",
|
||||||
"rawQuery": true,
|
"rawQuery": true,
|
||||||
"rawSql": "SELECT count(DISTINCT device) AS total_devices FROM (SELECT device FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device FROM tasmota_sensor WHERE $__timeFilter(time))",
|
"rawSql": "SELECT count(*) AS total_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time)",
|
||||||
"refId": "A"
|
"refId": "A"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"title": "Total Devices",
|
"title": "Tracked Devices",
|
||||||
"type": "stat"
|
"type": "stat"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -152,13 +152,13 @@
|
|||||||
},
|
},
|
||||||
"editorMode": "code",
|
"editorMode": "code",
|
||||||
"format": "table",
|
"format": "table",
|
||||||
"query": "SELECT count(DISTINCT device) AS power_devices FROM tasmota_sensor WHERE $__timeFilter(time) AND (energy_power IS NOT NULL OR energy_today IS NOT NULL OR energy_total IS NOT NULL)",
|
"query": "SELECT count(*) AS power_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time WHERE s.energy_power IS NOT NULL OR s.energy_today IS NOT NULL OR s.energy_total IS NOT NULL)",
|
||||||
"rawQuery": true,
|
"rawQuery": true,
|
||||||
"rawSql": "SELECT count(DISTINCT device) AS power_devices FROM tasmota_sensor WHERE $__timeFilter(time) AND (energy_power IS NOT NULL OR energy_today IS NOT NULL OR energy_total IS NOT NULL)",
|
"rawSql": "SELECT count(*) AS power_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time WHERE s.energy_power IS NOT NULL OR s.energy_today IS NOT NULL OR s.energy_total IS NOT NULL)",
|
||||||
"refId": "A"
|
"refId": "A"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"title": "Devices With Energy Metrics",
|
"title": "Energy Devices",
|
||||||
"type": "stat"
|
"type": "stat"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -223,9 +223,9 @@
|
|||||||
},
|
},
|
||||||
"editorMode": "code",
|
"editorMode": "code",
|
||||||
"format": "table",
|
"format": "table",
|
||||||
"query": "SELECT coalesce(sum(sensor.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
"query": "SELECT coalesce(sum(snapshots.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||||
"rawQuery": true,
|
"rawQuery": true,
|
||||||
"rawSql": "SELECT coalesce(sum(sensor.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
"rawSql": "SELECT coalesce(sum(snapshots.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||||
"refId": "A"
|
"refId": "A"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
@@ -286,9 +286,9 @@
|
|||||||
},
|
},
|
||||||
"editorMode": "code",
|
"editorMode": "code",
|
||||||
"format": "table",
|
"format": "table",
|
||||||
"query": "SELECT coalesce(sum(sensor.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
"query": "SELECT coalesce(sum(snapshots.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||||
"rawQuery": true,
|
"rawQuery": true,
|
||||||
"rawSql": "SELECT coalesce(sum(sensor.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
"rawSql": "SELECT coalesce(sum(snapshots.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||||
"refId": "A"
|
"refId": "A"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
@@ -349,9 +349,9 @@
|
|||||||
},
|
},
|
||||||
"editorMode": "code",
|
"editorMode": "code",
|
||||||
"format": "table",
|
"format": "table",
|
||||||
"query": "SELECT coalesce(sum(sensor.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
"query": "SELECT coalesce(sum(snapshots.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||||
"rawQuery": true,
|
"rawQuery": true,
|
||||||
"rawSql": "SELECT coalesce(sum(sensor.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor",
|
"rawSql": "SELECT coalesce(sum(snapshots.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots",
|
||||||
"refId": "A"
|
"refId": "A"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
@@ -579,9 +579,9 @@
|
|||||||
},
|
},
|
||||||
"editorMode": "code",
|
"editorMode": "code",
|
||||||
"format": "table",
|
"format": "table",
|
||||||
"query": "SELECT devices.device, coalesce(meta.device_alias, state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON devices.device = meta.device LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device",
|
"query": "SELECT snapshots.device, coalesce(meta.device_alias, snapshots.device_alias, '') AS alias, snapshots.last_seen, to_unixtime(now()) - to_unixtime(snapshots.last_seen) AS age_s, snapshots.energy_power AS current_draw_w, snapshots.energy_today AS daily_draw_kwh, snapshots.energy_total AS total_draw_kwh, snapshots.wifi_signal AS wifi_signal_dbm, snapshots.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT s.device, s.time AS last_seen, s.device_alias, s.energy_power, s.energy_today, s.energy_total, s.wifi_signal, s.uptime_sec FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON snapshots.device = meta.device ORDER BY snapshots.device",
|
||||||
"rawQuery": true,
|
"rawQuery": true,
|
||||||
"rawSql": "SELECT devices.device, coalesce(meta.device_alias, state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON devices.device = meta.device LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device",
|
"rawSql": "SELECT snapshots.device, coalesce(meta.device_alias, snapshots.device_alias, '') AS alias, snapshots.last_seen, to_unixtime(now()) - to_unixtime(snapshots.last_seen) AS age_s, snapshots.energy_power AS current_draw_w, snapshots.energy_today AS daily_draw_kwh, snapshots.energy_total AS total_draw_kwh, snapshots.wifi_signal AS wifi_signal_dbm, snapshots.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT s.device, s.time AS last_seen, s.device_alias, s.energy_power, s.energy_today, s.energy_total, s.wifi_signal, s.uptime_sec FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON snapshots.device = meta.device ORDER BY snapshots.device",
|
||||||
"refId": "A"
|
"refId": "A"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
@@ -608,6 +608,6 @@
|
|||||||
"timezone": "browser",
|
"timezone": "browser",
|
||||||
"title": "Tasmota Device Summary",
|
"title": "Tasmota Device Summary",
|
||||||
"uid": "tasmota-device-summary",
|
"uid": "tasmota-device-summary",
|
||||||
"version": 8,
|
"version": 9,
|
||||||
"weekStart": ""
|
"weekStart": ""
|
||||||
}
|
}
|
||||||
+44
-12
@@ -39,12 +39,13 @@ type InfluxConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type AppConfig struct {
|
type AppConfig struct {
|
||||||
BatchSize int `json:"batch_size"`
|
BatchSize int `json:"batch_size"`
|
||||||
BufferSize int `json:"buffer_size"`
|
BufferSize int `json:"buffer_size"`
|
||||||
FlushInterval DurationValue `json:"flush_interval"`
|
FlushInterval DurationValue `json:"flush_interval"`
|
||||||
FlushTimeout DurationValue `json:"flush_timeout"`
|
FlushTimeout DurationValue `json:"flush_timeout"`
|
||||||
LogLevel string `json:"log_level"`
|
LogLevel string `json:"log_level"`
|
||||||
HealthAddress string `json:"health_address"`
|
HealthAddress string `json:"health_address"`
|
||||||
|
TasmotaTimeZone string `json:"tasmota_time_zone"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type DurationValue struct {
|
type DurationValue struct {
|
||||||
@@ -124,6 +125,9 @@ func (cfg Config) Validate() error {
|
|||||||
if cfg.App.FlushTimeout.Duration <= 0 {
|
if cfg.App.FlushTimeout.Duration <= 0 {
|
||||||
return errors.New("app flush_timeout must be greater than zero")
|
return errors.New("app flush_timeout must be greater than zero")
|
||||||
}
|
}
|
||||||
|
if _, err := loadConfiguredLocation(cfg.App.TasmotaTimeZone); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -145,12 +149,13 @@ func defaultConfig() Config {
|
|||||||
Precision: "ns",
|
Precision: "ns",
|
||||||
},
|
},
|
||||||
App: AppConfig{
|
App: AppConfig{
|
||||||
BatchSize: 200,
|
BatchSize: 200,
|
||||||
BufferSize: 1000,
|
BufferSize: 1000,
|
||||||
FlushInterval: DurationValue{Duration: 10 * time.Second},
|
FlushInterval: DurationValue{Duration: 10 * time.Second},
|
||||||
FlushTimeout: DurationValue{Duration: 10 * time.Second},
|
FlushTimeout: DurationValue{Duration: 10 * time.Second},
|
||||||
LogLevel: "info",
|
LogLevel: "info",
|
||||||
HealthAddress: ":8080",
|
HealthAddress: ":8080",
|
||||||
|
TasmotaTimeZone: "UTC",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -166,6 +171,7 @@ func applyEnvOverrides(cfg *Config) error {
|
|||||||
setString(&cfg.Influx.Precision, envPrefix+"INFLUX_PRECISION")
|
setString(&cfg.Influx.Precision, envPrefix+"INFLUX_PRECISION")
|
||||||
setString(&cfg.App.LogLevel, envPrefix+"APP_LOG_LEVEL")
|
setString(&cfg.App.LogLevel, envPrefix+"APP_LOG_LEVEL")
|
||||||
setString(&cfg.App.HealthAddress, envPrefix+"APP_HEALTH_ADDRESS")
|
setString(&cfg.App.HealthAddress, envPrefix+"APP_HEALTH_ADDRESS")
|
||||||
|
setString(&cfg.App.TasmotaTimeZone, envPrefix+"APP_TASMOTA_TIME_ZONE")
|
||||||
|
|
||||||
if raw, ok := os.LookupEnv(envPrefix + "DEVICE_ALIASES"); ok {
|
if raw, ok := os.LookupEnv(envPrefix + "DEVICE_ALIASES"); ok {
|
||||||
if strings.TrimSpace(raw) == "" {
|
if strings.TrimSpace(raw) == "" {
|
||||||
@@ -276,3 +282,29 @@ func normalizeDeviceKey(value string) string {
|
|||||||
normalized = strings.Trim(normalized, "_")
|
normalized = strings.Trim(normalized, "_")
|
||||||
return normalized
|
return normalized
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cfg AppConfig) TasmotaLocation() *time.Location {
|
||||||
|
location, err := loadConfiguredLocation(cfg.TasmotaTimeZone)
|
||||||
|
if err != nil {
|
||||||
|
return time.UTC
|
||||||
|
}
|
||||||
|
|
||||||
|
return location
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadConfiguredLocation(name string) (*time.Location, error) {
|
||||||
|
trimmed := strings.TrimSpace(name)
|
||||||
|
if trimmed == "" || strings.EqualFold(trimmed, "utc") {
|
||||||
|
return time.UTC, nil
|
||||||
|
}
|
||||||
|
if strings.EqualFold(trimmed, "local") {
|
||||||
|
return time.Local, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
location, err := time.LoadLocation(trimmed)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("app tasmota_time_zone %q is invalid: %w", trimmed, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return location, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -7,8 +7,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestLoadNormalizesDeviceAliases(t *testing.T) {
|
func TestLoadNormalizesDeviceAliases(t *testing.T) {
|
||||||
t.Setenv("MQTT_SCRUBBER_DEVICE_ALIASES", "")
|
|
||||||
|
|
||||||
configPath := filepath.Join(t.TempDir(), "config.json")
|
configPath := filepath.Join(t.TempDir(), "config.json")
|
||||||
contents := `{
|
contents := `{
|
||||||
"mqtt": {
|
"mqtt": {
|
||||||
@@ -105,3 +103,76 @@ func TestLoadOverridesDeviceAliasesFromEnv(t *testing.T) {
|
|||||||
t.Fatalf("unexpected desk alias: got %q", got)
|
t.Fatalf("unexpected desk alias: got %q", got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLoadSupportsTasmotaTimeZone(t *testing.T) {
|
||||||
|
configPath := filepath.Join(t.TempDir(), "config.json")
|
||||||
|
contents := `{
|
||||||
|
"mqtt": {
|
||||||
|
"broker": "tcp://127.0.0.1:1883",
|
||||||
|
"client_id": "mqqt-scrubber",
|
||||||
|
"topics": ["tele/+/STATE"],
|
||||||
|
"qos": 0
|
||||||
|
},
|
||||||
|
"influx": {
|
||||||
|
"url": "http://127.0.0.1:8181",
|
||||||
|
"database": "home",
|
||||||
|
"precision": "ns"
|
||||||
|
},
|
||||||
|
"app": {
|
||||||
|
"batch_size": 200,
|
||||||
|
"buffer_size": 1000,
|
||||||
|
"flush_interval": "10s",
|
||||||
|
"flush_timeout": "10s",
|
||||||
|
"log_level": "info",
|
||||||
|
"health_address": ":8080",
|
||||||
|
"tasmota_time_zone": "Europe/Prague"
|
||||||
|
}
|
||||||
|
}`
|
||||||
|
|
||||||
|
if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil {
|
||||||
|
t.Fatalf("write config file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg, err := Load(configPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Load returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := cfg.App.TasmotaLocation().String(); got != "Europe/Prague" {
|
||||||
|
t.Fatalf("unexpected tasmota timezone: got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadRejectsInvalidTasmotaTimeZone(t *testing.T) {
|
||||||
|
configPath := filepath.Join(t.TempDir(), "config.json")
|
||||||
|
contents := `{
|
||||||
|
"mqtt": {
|
||||||
|
"broker": "tcp://127.0.0.1:1883",
|
||||||
|
"client_id": "mqqt-scrubber",
|
||||||
|
"topics": ["tele/+/STATE"],
|
||||||
|
"qos": 0
|
||||||
|
},
|
||||||
|
"influx": {
|
||||||
|
"url": "http://127.0.0.1:8181",
|
||||||
|
"database": "home",
|
||||||
|
"precision": "ns"
|
||||||
|
},
|
||||||
|
"app": {
|
||||||
|
"batch_size": 200,
|
||||||
|
"buffer_size": 1000,
|
||||||
|
"flush_interval": "10s",
|
||||||
|
"flush_timeout": "10s",
|
||||||
|
"log_level": "info",
|
||||||
|
"health_address": ":8080",
|
||||||
|
"tasmota_time_zone": "Not/AZone"
|
||||||
|
}
|
||||||
|
}`
|
||||||
|
|
||||||
|
if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil {
|
||||||
|
t.Fatalf("write config file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := Load(configPath); err == nil {
|
||||||
|
t.Fatal("expected Load to reject invalid tasmota_time_zone")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -22,6 +22,14 @@ var tasmotaTimeLayouts = []string{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ParseTasmota(message model.RawMessage) ([]model.Record, error) {
|
func ParseTasmota(message model.RawMessage) ([]model.Record, error) {
|
||||||
|
return ParseTasmotaInLocation(message, time.UTC)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseTasmotaInLocation(message model.RawMessage, timezoneLessTimeLocation *time.Location) ([]model.Record, error) {
|
||||||
|
if timezoneLessTimeLocation == nil {
|
||||||
|
timezoneLessTimeLocation = time.UTC
|
||||||
|
}
|
||||||
|
|
||||||
parts := strings.Split(message.Topic, "/")
|
parts := strings.Split(message.Topic, "/")
|
||||||
if len(parts) != 3 {
|
if len(parts) != 3 {
|
||||||
return nil, fmt.Errorf("unsupported topic shape: %s", message.Topic)
|
return nil, fmt.Errorf("unsupported topic shape: %s", message.Topic)
|
||||||
@@ -52,7 +60,7 @@ func ParseTasmota(message model.RawMessage) ([]model.Record, error) {
|
|||||||
return nil, fmt.Errorf("no usable fields in payload for topic %s", message.Topic)
|
return nil, fmt.Errorf("no usable fields in payload for topic %s", message.Topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
timestamp := parsePayloadTimestamp(payload, message.ReceivedAt)
|
timestamp := parsePayloadTimestamp(payload, message.ReceivedAt, timezoneLessTimeLocation)
|
||||||
|
|
||||||
record := model.Record{
|
record := model.Record{
|
||||||
Measurement: measurement,
|
Measurement: measurement,
|
||||||
@@ -77,11 +85,14 @@ func parseLWT(message model.RawMessage, measurement string, tags map[string]stri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func parsePayloadTimestamp(payload map[string]any, fallback time.Time) time.Time {
|
func parsePayloadTimestamp(payload map[string]any, fallback time.Time, timezoneLessTimeLocation *time.Location) time.Time {
|
||||||
rawTime, ok := payload["Time"].(string)
|
rawTime, ok := payload["Time"].(string)
|
||||||
if !ok || strings.TrimSpace(rawTime) == "" {
|
if !ok || strings.TrimSpace(rawTime) == "" {
|
||||||
return fallback
|
return fallback
|
||||||
}
|
}
|
||||||
|
if timezoneLessTimeLocation == nil {
|
||||||
|
timezoneLessTimeLocation = time.UTC
|
||||||
|
}
|
||||||
|
|
||||||
for _, layout := range tasmotaTimeLayouts {
|
for _, layout := range tasmotaTimeLayouts {
|
||||||
var (
|
var (
|
||||||
@@ -90,13 +101,13 @@ func parsePayloadTimestamp(payload map[string]any, fallback time.Time) time.Time
|
|||||||
)
|
)
|
||||||
|
|
||||||
if layout == "2006-01-02T15:04:05" {
|
if layout == "2006-01-02T15:04:05" {
|
||||||
parsed, err = time.ParseInLocation(layout, rawTime, time.UTC)
|
parsed, err = time.ParseInLocation(layout, rawTime, timezoneLessTimeLocation)
|
||||||
} else {
|
} else {
|
||||||
parsed, err = time.Parse(layout, rawTime)
|
parsed, err = time.Parse(layout, rawTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return parsed
|
return parsed.UTC()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -85,6 +85,29 @@ func TestParseTasmotaFixtures(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParseTasmotaInLocationUsesConfiguredTimezoneForTimezoneLessTime(t *testing.T) {
|
||||||
|
location := time.FixedZone("CET", 3600)
|
||||||
|
receivedAt := time.Date(2026, time.March, 16, 11, 55, 30, 0, time.UTC)
|
||||||
|
|
||||||
|
records, err := ParseTasmotaInLocation(model.RawMessage{
|
||||||
|
Topic: "tele/tasmota_67850B/SENSOR",
|
||||||
|
Payload: []byte(`{"Time":"2026-03-16T11:55:29","ENERGY":{"Power":9}}`),
|
||||||
|
ReceivedAt: receivedAt,
|
||||||
|
}, location)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ParseTasmotaInLocation returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(records) != 1 {
|
||||||
|
t.Fatalf("expected 1 record, got %d", len(records))
|
||||||
|
}
|
||||||
|
|
||||||
|
want := time.Date(2026, time.March, 16, 10, 55, 29, 0, time.UTC)
|
||||||
|
if !records[0].Timestamp.Equal(want) {
|
||||||
|
t.Fatalf("unexpected timestamp: got %s want %s", records[0].Timestamp.Format(time.RFC3339), want.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func fieldEquals(got any, want any) bool {
|
func fieldEquals(got any, want any) bool {
|
||||||
switch typedWant := want.(type) {
|
switch typedWant := want.(type) {
|
||||||
case float64:
|
case float64:
|
||||||
|
|||||||
+149
-20
@@ -21,16 +21,19 @@ type writer interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
config config.Config
|
config config.Config
|
||||||
deviceAliases map[string]string
|
tasmotaTimeLocation *time.Location
|
||||||
aliasRecords []model.Record
|
deviceAliases map[string]string
|
||||||
influxClient writer
|
aliasRecords []model.Record
|
||||||
input chan model.RawMessage
|
snapshots map[string]*deviceSnapshot
|
||||||
received atomic.Uint64
|
dirtyDevices map[string]struct{}
|
||||||
parsed atomic.Uint64
|
influxClient writer
|
||||||
written atomic.Uint64
|
input chan model.RawMessage
|
||||||
dropped atomic.Uint64
|
received atomic.Uint64
|
||||||
failed atomic.Uint64
|
parsed atomic.Uint64
|
||||||
|
written atomic.Uint64
|
||||||
|
dropped atomic.Uint64
|
||||||
|
failed atomic.Uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
type Snapshot struct {
|
type Snapshot struct {
|
||||||
@@ -41,14 +44,26 @@ type Snapshot struct {
|
|||||||
Failed uint64 `json:"failed"`
|
Failed uint64 `json:"failed"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type deviceSnapshot struct {
|
||||||
|
Device string
|
||||||
|
Alias string
|
||||||
|
LastSeen time.Time
|
||||||
|
StateSeen time.Time
|
||||||
|
SensorSeen time.Time
|
||||||
|
Fields map[string]any
|
||||||
|
}
|
||||||
|
|
||||||
func NewService(cfg config.Config, influxClient writer) *Service {
|
func NewService(cfg config.Config, influxClient writer) *Service {
|
||||||
normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases)
|
normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases)
|
||||||
return &Service{
|
return &Service{
|
||||||
config: cfg,
|
config: cfg,
|
||||||
deviceAliases: normalizedAliases,
|
tasmotaTimeLocation: cfg.App.TasmotaLocation(),
|
||||||
aliasRecords: buildAliasRecords(normalizedAliases),
|
deviceAliases: normalizedAliases,
|
||||||
influxClient: influxClient,
|
aliasRecords: buildAliasRecords(normalizedAliases),
|
||||||
input: make(chan model.RawMessage, cfg.App.BufferSize),
|
snapshots: make(map[string]*deviceSnapshot),
|
||||||
|
dirtyDevices: make(map[string]struct{}),
|
||||||
|
influxClient: influxClient,
|
||||||
|
input: make(chan model.RawMessage, cfg.App.BufferSize),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -91,7 +106,7 @@ func (service *Service) Run(ctx context.Context) error {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
flushCtx, cancel := context.WithTimeout(context.Background(), service.config.App.FlushTimeout.Duration)
|
flushCtx, cancel := context.WithTimeout(context.Background(), service.config.App.FlushTimeout.Duration)
|
||||||
err := service.flush(flushCtx, batch)
|
err := service.flushPending(flushCtx, batch)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -100,7 +115,7 @@ func (service *Service) Run(ctx context.Context) error {
|
|||||||
service.logCounters()
|
service.logCounters()
|
||||||
return nil
|
return nil
|
||||||
case message := <-input:
|
case message := <-input:
|
||||||
records, err := parser.ParseTasmota(message)
|
records, err := parser.ParseTasmotaInLocation(message, service.tasmotaTimeLocation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
service.failed.Add(1)
|
service.failed.Add(1)
|
||||||
slog.Warn("failed to parse message", "topic", message.Topic, "error", err)
|
slog.Warn("failed to parse message", "topic", message.Topic, "error", err)
|
||||||
@@ -108,13 +123,14 @@ func (service *Service) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
service.applyDeviceAliases(records)
|
service.applyDeviceAliases(records)
|
||||||
|
service.updateDeviceSnapshots(records)
|
||||||
|
|
||||||
service.parsed.Add(uint64(len(records)))
|
service.parsed.Add(uint64(len(records)))
|
||||||
batch = append(batch, records...)
|
batch = append(batch, records...)
|
||||||
|
|
||||||
if len(batch) >= service.config.App.BatchSize {
|
if len(batch) >= service.config.App.BatchSize {
|
||||||
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
||||||
err := service.flush(flushCtx, batch)
|
err := service.flushPending(flushCtx, batch)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("failed to flush full batch to influx; keeping batch in memory", "count", len(batch), "error", err)
|
slog.Error("failed to flush full batch to influx; keeping batch in memory", "count", len(batch), "error", err)
|
||||||
@@ -123,12 +139,12 @@ func (service *Service) Run(ctx context.Context) error {
|
|||||||
batch = batch[:0]
|
batch = batch[:0]
|
||||||
}
|
}
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if len(batch) == 0 {
|
if len(batch) == 0 && len(service.dirtyDevices) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
||||||
err := service.flush(flushCtx, batch)
|
err := service.flushPending(flushCtx, batch)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("failed to flush batch to influx; will retry on next interval", "count", len(batch), "error", err)
|
slog.Error("failed to flush batch to influx; will retry on next interval", "count", len(batch), "error", err)
|
||||||
@@ -139,6 +155,18 @@ func (service *Service) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (service *Service) flushPending(ctx context.Context, batch []model.Record) error {
|
||||||
|
flushBatch := append([]model.Record(nil), batch...)
|
||||||
|
flushBatch = append(flushBatch, service.buildSnapshotRecords()...)
|
||||||
|
|
||||||
|
if err := service.flush(ctx, flushBatch); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
service.clearDirtyDevices()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (service *Service) flush(ctx context.Context, batch []model.Record) error {
|
func (service *Service) flush(ctx context.Context, batch []model.Record) error {
|
||||||
if len(batch) == 0 {
|
if len(batch) == 0 {
|
||||||
return nil
|
return nil
|
||||||
@@ -189,6 +217,107 @@ func (service *Service) applyDeviceAliases(records []model.Record) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (service *Service) updateDeviceSnapshots(records []model.Record) {
|
||||||
|
for _, record := range records {
|
||||||
|
device := normalizeDeviceKey(record.Tags["device"])
|
||||||
|
if device == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshot, ok := service.snapshots[device]
|
||||||
|
if !ok {
|
||||||
|
snapshot = &deviceSnapshot{
|
||||||
|
Device: device,
|
||||||
|
Fields: make(map[string]any),
|
||||||
|
}
|
||||||
|
service.snapshots[device] = snapshot
|
||||||
|
}
|
||||||
|
|
||||||
|
if record.Timestamp.After(snapshot.LastSeen) {
|
||||||
|
snapshot.LastSeen = record.Timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
if alias, ok := record.Fields["device_alias"].(string); ok && strings.TrimSpace(alias) != "" {
|
||||||
|
snapshot.Alias = alias
|
||||||
|
snapshot.Fields["device_alias"] = alias
|
||||||
|
}
|
||||||
|
|
||||||
|
switch record.Tags["message_type"] {
|
||||||
|
case "state":
|
||||||
|
if record.Timestamp.After(snapshot.StateSeen) {
|
||||||
|
snapshot.StateSeen = record.Timestamp
|
||||||
|
copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "wifi_signal", "uptime_sec")
|
||||||
|
}
|
||||||
|
case "sensor":
|
||||||
|
if record.Timestamp.After(snapshot.SensorSeen) {
|
||||||
|
snapshot.SensorSeen = record.Timestamp
|
||||||
|
copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "energy_power", "energy_today", "energy_total")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
service.dirtyDevices[device] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func copySnapshotFields(target map[string]any, source map[string]any, keys ...string) {
|
||||||
|
for _, key := range keys {
|
||||||
|
value, ok := source[key]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
target[key] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (service *Service) buildSnapshotRecords() []model.Record {
|
||||||
|
if len(service.dirtyDevices) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
devices := make([]string, 0, len(service.dirtyDevices))
|
||||||
|
for device := range service.dirtyDevices {
|
||||||
|
devices = append(devices, device)
|
||||||
|
}
|
||||||
|
sort.Strings(devices)
|
||||||
|
|
||||||
|
records := make([]model.Record, 0, len(devices))
|
||||||
|
for _, device := range devices {
|
||||||
|
snapshot := service.snapshots[device]
|
||||||
|
if snapshot == nil || snapshot.LastSeen.IsZero() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fields := make(map[string]any, len(snapshot.Fields)+2)
|
||||||
|
for key, value := range snapshot.Fields {
|
||||||
|
fields[key] = value
|
||||||
|
}
|
||||||
|
if !snapshot.StateSeen.IsZero() {
|
||||||
|
fields["state_last_seen_unix"] = snapshot.StateSeen.Unix()
|
||||||
|
}
|
||||||
|
if !snapshot.SensorSeen.IsZero() {
|
||||||
|
fields["sensor_last_seen_unix"] = snapshot.SensorSeen.Unix()
|
||||||
|
}
|
||||||
|
|
||||||
|
records = append(records, model.Record{
|
||||||
|
Measurement: "tasmota_device_snapshot",
|
||||||
|
Tags: map[string]string{
|
||||||
|
"device": device,
|
||||||
|
"source": "derived",
|
||||||
|
},
|
||||||
|
Fields: fields,
|
||||||
|
Timestamp: snapshot.LastSeen,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return records
|
||||||
|
}
|
||||||
|
|
||||||
|
func (service *Service) clearDirtyDevices() {
|
||||||
|
for device := range service.dirtyDevices {
|
||||||
|
delete(service.dirtyDevices, device)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func normalizeDeviceAliases(aliases map[string]string) map[string]string {
|
func normalizeDeviceAliases(aliases map[string]string) map[string]string {
|
||||||
if len(aliases) == 0 {
|
if len(aliases) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -55,6 +55,17 @@ func (writer *fakeWriter) allBatches() [][]model.Record {
|
|||||||
return copyBatches
|
return copyBatches
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func findRecord(records []model.Record, measurement string, device string) *model.Record {
|
||||||
|
for index := range records {
|
||||||
|
record := &records[index]
|
||||||
|
if record.Measurement == measurement && record.Tags["device"] == device {
|
||||||
|
return record
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestServiceFlushesParsedRecords(t *testing.T) {
|
func TestServiceFlushesParsedRecords(t *testing.T) {
|
||||||
fake := newFakeWriter()
|
fake := newFakeWriter()
|
||||||
service := NewService(config.Config{
|
service := NewService(config.Config{
|
||||||
@@ -118,8 +129,8 @@ func TestServiceFlushesParsedRecords(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
batch := batches[1]
|
batch := batches[1]
|
||||||
if len(batch) != 2 {
|
if len(batch) != 4 {
|
||||||
t.Fatalf("expected 2 records in flushed batch, got %d", len(batch))
|
t.Fatalf("expected 4 records in flushed batch, got %d", len(batch))
|
||||||
}
|
}
|
||||||
|
|
||||||
if batch[0].Measurement != "tasmota_lwt" {
|
if batch[0].Measurement != "tasmota_lwt" {
|
||||||
@@ -142,6 +153,34 @@ func TestServiceFlushesParsedRecords(t *testing.T) {
|
|||||||
t.Fatalf("unexpected sensor device_alias field: %#v", batch[1].Fields["device_alias"])
|
t.Fatalf("unexpected sensor device_alias field: %#v", batch[1].Fields["device_alias"])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
garageSnapshot := findRecord(batch, "tasmota_device_snapshot", "tasmota_896001")
|
||||||
|
if garageSnapshot == nil {
|
||||||
|
t.Fatal("expected garage snapshot record in flushed batch")
|
||||||
|
}
|
||||||
|
if garageSnapshot.Fields["device_alias"] != "Garage Plug" {
|
||||||
|
t.Fatalf("unexpected garage snapshot alias: %#v", garageSnapshot.Fields["device_alias"])
|
||||||
|
}
|
||||||
|
if _, ok := garageSnapshot.Fields["sensor_last_seen_unix"]; ok {
|
||||||
|
t.Fatalf("did not expect sensor timestamp on lwt-only snapshot: %#v", garageSnapshot.Fields["sensor_last_seen_unix"])
|
||||||
|
}
|
||||||
|
|
||||||
|
officeSnapshot := findRecord(batch, "tasmota_device_snapshot", "tasmota_c88994")
|
||||||
|
if officeSnapshot == nil {
|
||||||
|
t.Fatal("expected office snapshot record in flushed batch")
|
||||||
|
}
|
||||||
|
if officeSnapshot.Fields["device_alias"] != "Office Plug" {
|
||||||
|
t.Fatalf("unexpected office snapshot alias: %#v", officeSnapshot.Fields["device_alias"])
|
||||||
|
}
|
||||||
|
if officeSnapshot.Fields["energy_total"] != float64(41.385) {
|
||||||
|
t.Fatalf("unexpected office snapshot energy_total: %#v", officeSnapshot.Fields["energy_total"])
|
||||||
|
}
|
||||||
|
if officeSnapshot.Fields["energy_power"] != float64(1) {
|
||||||
|
t.Fatalf("unexpected office snapshot energy_power: %#v", officeSnapshot.Fields["energy_power"])
|
||||||
|
}
|
||||||
|
if officeSnapshot.Fields["sensor_last_seen_unix"] != time.Date(2026, time.March, 12, 16, 23, 13, 0, time.UTC).Unix() {
|
||||||
|
t.Fatalf("unexpected office snapshot sensor timestamp: %#v", officeSnapshot.Fields["sensor_last_seen_unix"])
|
||||||
|
}
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
select {
|
select {
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
|
|||||||
Reference in New Issue
Block a user