diff --git a/docs/grafana/tasmota-device-summary-dashboard.json b/docs/grafana/tasmota-device-summary-dashboard.json index 93290fb..3486665 100644 --- a/docs/grafana/tasmota-device-summary-dashboard.json +++ b/docs/grafana/tasmota-device-summary-dashboard.json @@ -15,7 +15,7 @@ } ] }, - "description": "Fleet overview of known Tasmota devices with drilldown links into a dedicated per-device dashboard.", + "description": "Fleet overview of Tasmota devices using the latest derived per-device snapshot, with drilldown links into a dedicated per-device dashboard.", "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, @@ -89,13 +89,13 @@ }, "editorMode": "code", "format": "table", - "query": "SELECT count(DISTINCT device) AS total_devices FROM (SELECT device FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device FROM tasmota_sensor WHERE $__timeFilter(time))", + "query": "SELECT count(*) AS total_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time)", "rawQuery": true, - "rawSql": "SELECT count(DISTINCT device) AS total_devices FROM (SELECT device FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device FROM tasmota_sensor WHERE $__timeFilter(time))", + "rawSql": "SELECT count(*) AS total_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time)", "refId": "A" } ], - "title": "Total Devices", + "title": "Tracked Devices", "type": "stat" }, { @@ -152,13 +152,13 @@ }, "editorMode": "code", "format": "table", - "query": "SELECT count(DISTINCT device) AS power_devices FROM tasmota_sensor WHERE $__timeFilter(time) AND (energy_power IS NOT NULL OR energy_today IS NOT NULL OR energy_total IS NOT NULL)", + "query": "SELECT count(*) AS power_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time WHERE s.energy_power IS NOT NULL OR s.energy_today IS NOT NULL OR s.energy_total IS NOT NULL)", "rawQuery": true, - "rawSql": "SELECT count(DISTINCT device) AS power_devices FROM tasmota_sensor WHERE $__timeFilter(time) AND (energy_power IS NOT NULL OR energy_today IS NOT NULL OR energy_total IS NOT NULL)", + "rawSql": "SELECT count(*) AS power_devices FROM (SELECT s.device FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time WHERE s.energy_power IS NOT NULL OR s.energy_today IS NOT NULL OR s.energy_total IS NOT NULL)", "refId": "A" } ], - "title": "Devices With Energy Metrics", + "title": "Energy Devices", "type": "stat" }, { @@ -223,9 +223,9 @@ }, "editorMode": "code", "format": "table", - "query": "SELECT coalesce(sum(sensor.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor", + "query": "SELECT coalesce(sum(snapshots.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots", "rawQuery": true, - "rawSql": "SELECT coalesce(sum(sensor.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor", + "rawSql": "SELECT coalesce(sum(snapshots.energy_power), 0) AS fleet_current_draw_w FROM (SELECT s.device, s.energy_power FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots", "refId": "A" } ], @@ -286,9 +286,9 @@ }, "editorMode": "code", "format": "table", - "query": "SELECT coalesce(sum(sensor.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor", + "query": "SELECT coalesce(sum(snapshots.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots", "rawQuery": true, - "rawSql": "SELECT coalesce(sum(sensor.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor", + "rawSql": "SELECT coalesce(sum(snapshots.energy_today), 0) AS fleet_daily_draw_kwh FROM (SELECT s.device, s.energy_today FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots", "refId": "A" } ], @@ -349,9 +349,9 @@ }, "editorMode": "code", "format": "table", - "query": "SELECT coalesce(sum(sensor.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor", + "query": "SELECT coalesce(sum(snapshots.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots", "rawQuery": true, - "rawSql": "SELECT coalesce(sum(sensor.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) sensor", + "rawSql": "SELECT coalesce(sum(snapshots.energy_total), 0) AS fleet_total_draw_kwh FROM (SELECT s.device, s.energy_total FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots", "refId": "A" } ], @@ -579,9 +579,9 @@ }, "editorMode": "code", "format": "table", - "query": "SELECT devices.device, coalesce(meta.device_alias, state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON devices.device = meta.device LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", + "query": "SELECT snapshots.device, coalesce(meta.device_alias, snapshots.device_alias, '') AS alias, snapshots.last_seen, to_unixtime(now()) - to_unixtime(snapshots.last_seen) AS age_s, snapshots.energy_power AS current_draw_w, snapshots.energy_today AS daily_draw_kwh, snapshots.energy_total AS total_draw_kwh, snapshots.wifi_signal AS wifi_signal_dbm, snapshots.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT s.device, s.time AS last_seen, s.device_alias, s.energy_power, s.energy_today, s.energy_total, s.wifi_signal, s.uptime_sec FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON snapshots.device = meta.device ORDER BY snapshots.device", "rawQuery": true, - "rawSql": "SELECT devices.device, coalesce(meta.device_alias, state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON devices.device = meta.device LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", + "rawSql": "SELECT snapshots.device, coalesce(meta.device_alias, snapshots.device_alias, '') AS alias, snapshots.last_seen, to_unixtime(now()) - to_unixtime(snapshots.last_seen) AS age_s, snapshots.energy_power AS current_draw_w, snapshots.energy_today AS daily_draw_kwh, snapshots.energy_total AS total_draw_kwh, snapshots.wifi_signal AS wifi_signal_dbm, snapshots.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT s.device, s.time AS last_seen, s.device_alias, s.energy_power, s.energy_today, s.energy_total, s.wifi_signal, s.uptime_sec FROM tasmota_device_snapshot s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_snapshot GROUP BY device) latest ON s.device = latest.device AND s.time = latest.time) snapshots LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON snapshots.device = meta.device ORDER BY snapshots.device", "refId": "A" } ], @@ -608,6 +608,6 @@ "timezone": "browser", "title": "Tasmota Device Summary", "uid": "tasmota-device-summary", - "version": 8, + "version": 9, "weekStart": "" } \ No newline at end of file diff --git a/internal/pipeline/service.go b/internal/pipeline/service.go index 1cde42d..d9493db 100644 --- a/internal/pipeline/service.go +++ b/internal/pipeline/service.go @@ -24,6 +24,8 @@ type Service struct { config config.Config deviceAliases map[string]string aliasRecords []model.Record + snapshots map[string]*deviceSnapshot + dirtyDevices map[string]struct{} influxClient writer input chan model.RawMessage received atomic.Uint64 @@ -41,12 +43,23 @@ type Snapshot struct { Failed uint64 `json:"failed"` } +type deviceSnapshot struct { + Device string + Alias string + LastSeen time.Time + StateSeen time.Time + SensorSeen time.Time + Fields map[string]any +} + func NewService(cfg config.Config, influxClient writer) *Service { normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases) return &Service{ config: cfg, deviceAliases: normalizedAliases, aliasRecords: buildAliasRecords(normalizedAliases), + snapshots: make(map[string]*deviceSnapshot), + dirtyDevices: make(map[string]struct{}), influxClient: influxClient, input: make(chan model.RawMessage, cfg.App.BufferSize), } @@ -91,7 +104,7 @@ func (service *Service) Run(ctx context.Context) error { select { case <-ctx.Done(): flushCtx, cancel := context.WithTimeout(context.Background(), service.config.App.FlushTimeout.Duration) - err := service.flush(flushCtx, batch) + err := service.flushPending(flushCtx, batch) cancel() if err != nil { return err @@ -108,13 +121,14 @@ func (service *Service) Run(ctx context.Context) error { } service.applyDeviceAliases(records) + service.updateDeviceSnapshots(records) service.parsed.Add(uint64(len(records))) batch = append(batch, records...) if len(batch) >= service.config.App.BatchSize { flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) - err := service.flush(flushCtx, batch) + err := service.flushPending(flushCtx, batch) cancel() if err != nil { slog.Error("failed to flush full batch to influx; keeping batch in memory", "count", len(batch), "error", err) @@ -123,12 +137,12 @@ func (service *Service) Run(ctx context.Context) error { batch = batch[:0] } case <-ticker.C: - if len(batch) == 0 { + if len(batch) == 0 && len(service.dirtyDevices) == 0 { continue } flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration) - err := service.flush(flushCtx, batch) + err := service.flushPending(flushCtx, batch) cancel() if err != nil { slog.Error("failed to flush batch to influx; will retry on next interval", "count", len(batch), "error", err) @@ -139,6 +153,18 @@ func (service *Service) Run(ctx context.Context) error { } } +func (service *Service) flushPending(ctx context.Context, batch []model.Record) error { + flushBatch := append([]model.Record(nil), batch...) + flushBatch = append(flushBatch, service.buildSnapshotRecords()...) + + if err := service.flush(ctx, flushBatch); err != nil { + return err + } + + service.clearDirtyDevices() + return nil +} + func (service *Service) flush(ctx context.Context, batch []model.Record) error { if len(batch) == 0 { return nil @@ -189,6 +215,107 @@ func (service *Service) applyDeviceAliases(records []model.Record) { } } +func (service *Service) updateDeviceSnapshots(records []model.Record) { + for _, record := range records { + device := normalizeDeviceKey(record.Tags["device"]) + if device == "" { + continue + } + + snapshot, ok := service.snapshots[device] + if !ok { + snapshot = &deviceSnapshot{ + Device: device, + Fields: make(map[string]any), + } + service.snapshots[device] = snapshot + } + + if record.Timestamp.After(snapshot.LastSeen) { + snapshot.LastSeen = record.Timestamp + } + + if alias, ok := record.Fields["device_alias"].(string); ok && strings.TrimSpace(alias) != "" { + snapshot.Alias = alias + snapshot.Fields["device_alias"] = alias + } + + switch record.Tags["message_type"] { + case "state": + if record.Timestamp.After(snapshot.StateSeen) { + snapshot.StateSeen = record.Timestamp + copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "wifi_signal", "uptime_sec") + } + case "sensor": + if record.Timestamp.After(snapshot.SensorSeen) { + snapshot.SensorSeen = record.Timestamp + copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "energy_power", "energy_today", "energy_total") + } + } + + service.dirtyDevices[device] = struct{}{} + } +} + +func copySnapshotFields(target map[string]any, source map[string]any, keys ...string) { + for _, key := range keys { + value, ok := source[key] + if !ok { + continue + } + target[key] = value + } +} + +func (service *Service) buildSnapshotRecords() []model.Record { + if len(service.dirtyDevices) == 0 { + return nil + } + + devices := make([]string, 0, len(service.dirtyDevices)) + for device := range service.dirtyDevices { + devices = append(devices, device) + } + sort.Strings(devices) + + records := make([]model.Record, 0, len(devices)) + for _, device := range devices { + snapshot := service.snapshots[device] + if snapshot == nil || snapshot.LastSeen.IsZero() { + continue + } + + fields := make(map[string]any, len(snapshot.Fields)+2) + for key, value := range snapshot.Fields { + fields[key] = value + } + if !snapshot.StateSeen.IsZero() { + fields["state_last_seen_unix"] = snapshot.StateSeen.Unix() + } + if !snapshot.SensorSeen.IsZero() { + fields["sensor_last_seen_unix"] = snapshot.SensorSeen.Unix() + } + + records = append(records, model.Record{ + Measurement: "tasmota_device_snapshot", + Tags: map[string]string{ + "device": device, + "source": "derived", + }, + Fields: fields, + Timestamp: snapshot.LastSeen, + }) + } + + return records +} + +func (service *Service) clearDirtyDevices() { + for device := range service.dirtyDevices { + delete(service.dirtyDevices, device) + } +} + func normalizeDeviceAliases(aliases map[string]string) map[string]string { if len(aliases) == 0 { return nil diff --git a/internal/pipeline/service_test.go b/internal/pipeline/service_test.go index 3d2bfa0..195a208 100644 --- a/internal/pipeline/service_test.go +++ b/internal/pipeline/service_test.go @@ -55,6 +55,17 @@ func (writer *fakeWriter) allBatches() [][]model.Record { return copyBatches } +func findRecord(records []model.Record, measurement string, device string) *model.Record { + for index := range records { + record := &records[index] + if record.Measurement == measurement && record.Tags["device"] == device { + return record + } + } + + return nil +} + func TestServiceFlushesParsedRecords(t *testing.T) { fake := newFakeWriter() service := NewService(config.Config{ @@ -118,8 +129,8 @@ func TestServiceFlushesParsedRecords(t *testing.T) { } batch := batches[1] - if len(batch) != 2 { - t.Fatalf("expected 2 records in flushed batch, got %d", len(batch)) + if len(batch) != 4 { + t.Fatalf("expected 4 records in flushed batch, got %d", len(batch)) } if batch[0].Measurement != "tasmota_lwt" { @@ -142,6 +153,34 @@ func TestServiceFlushesParsedRecords(t *testing.T) { t.Fatalf("unexpected sensor device_alias field: %#v", batch[1].Fields["device_alias"]) } + garageSnapshot := findRecord(batch, "tasmota_device_snapshot", "tasmota_896001") + if garageSnapshot == nil { + t.Fatal("expected garage snapshot record in flushed batch") + } + if garageSnapshot.Fields["device_alias"] != "Garage Plug" { + t.Fatalf("unexpected garage snapshot alias: %#v", garageSnapshot.Fields["device_alias"]) + } + if _, ok := garageSnapshot.Fields["sensor_last_seen_unix"]; ok { + t.Fatalf("did not expect sensor timestamp on lwt-only snapshot: %#v", garageSnapshot.Fields["sensor_last_seen_unix"]) + } + + officeSnapshot := findRecord(batch, "tasmota_device_snapshot", "tasmota_c88994") + if officeSnapshot == nil { + t.Fatal("expected office snapshot record in flushed batch") + } + if officeSnapshot.Fields["device_alias"] != "Office Plug" { + t.Fatalf("unexpected office snapshot alias: %#v", officeSnapshot.Fields["device_alias"]) + } + if officeSnapshot.Fields["energy_total"] != float64(41.385) { + t.Fatalf("unexpected office snapshot energy_total: %#v", officeSnapshot.Fields["energy_total"]) + } + if officeSnapshot.Fields["energy_power"] != float64(1) { + t.Fatalf("unexpected office snapshot energy_power: %#v", officeSnapshot.Fields["energy_power"]) + } + if officeSnapshot.Fields["sensor_last_seen_unix"] != time.Date(2026, time.March, 12, 16, 23, 13, 0, time.UTC).Unix() { + t.Fatalf("unexpected office snapshot sensor timestamp: %#v", officeSnapshot.Fields["sensor_last_seen_unix"]) + } + cancel() select { case err := <-errCh: