feature/device-aliases #1

Merged
yugi merged 2 commits from feature/device-aliases into master 2026-03-14 23:03:09 +01:00
8 changed files with 340 additions and 19 deletions
+5
View File
@@ -52,6 +52,7 @@ Supported environment variables:
- `MQTT_SCRUBBER_INFLUX_DATABASE` - `MQTT_SCRUBBER_INFLUX_DATABASE`
- `MQTT_SCRUBBER_INFLUX_TOKEN` - `MQTT_SCRUBBER_INFLUX_TOKEN`
- `MQTT_SCRUBBER_INFLUX_PRECISION` - `MQTT_SCRUBBER_INFLUX_PRECISION`
- `MQTT_SCRUBBER_DEVICE_ALIASES` as a JSON object such as `{"kitchen_plug":"Kitchen Plug"}`
- `MQTT_SCRUBBER_APP_BATCH_SIZE` - `MQTT_SCRUBBER_APP_BATCH_SIZE`
- `MQTT_SCRUBBER_APP_BUFFER_SIZE` - `MQTT_SCRUBBER_APP_BUFFER_SIZE`
- `MQTT_SCRUBBER_APP_FLUSH_INTERVAL` - `MQTT_SCRUBBER_APP_FLUSH_INTERVAL`
@@ -61,6 +62,8 @@ Supported environment variables:
`MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list. `MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list.
You can also define optional per-device aliases in config with a top-level `device_aliases` object. Keys are normalized like device tags, so `kitchen-plug`, `Kitchen Plug`, and `kitchen_plug` all resolve to `kitchen_plug`.
## Run ## Run
```bash ```bash
@@ -135,6 +138,8 @@ Relay panels use the latest `power`, `power1`, `power2`, `power3`, and `power4`
The dashboard is split into a fleet summary section and a selected-device section. The selected-device section also includes `Last Seen`, `Seconds Since Last Message`, and `Messages In Range` panels derived from both `tasmota_state` and `tasmota_sensor` timestamps. The dashboard is split into a fleet summary section and a selected-device section. The selected-device section also includes `Last Seen`, `Seconds Since Last Message`, and `Messages In Range` panels derived from both `tasmota_state` and `tasmota_sensor` timestamps.
If `device_aliases` is configured, the summary table will expose an `alias` column populated from the latest state or sensor record for each device.
## Notes ## Notes
- The repo name is kept as `mqqt-scrubber` to match the existing folder. - The repo name is kept as `mqqt-scrubber` to match the existing folder.
+4
View File
@@ -16,6 +16,10 @@
"token": "", "token": "",
"precision": "ns" "precision": "ns"
}, },
"device_aliases": {
"tasmota_c88994": "Office Plug",
"kitchen-plug": "Kitchen Counter Plug"
},
"app": { "app": {
"batch_size": 200, "batch_size": 200,
"buffer_size": 1000, "buffer_size": 1000,
+1
View File
@@ -125,6 +125,7 @@ timestamp: payload Time or message receive time
- `LWT`: - `LWT`:
- `state` as string - `state` as string
- `online` as boolean - `online` as boolean
- all measurements may also include `device_alias` as an optional string when configured externally
- `STATE`: - `STATE`:
- base fields like `uptime`, `uptime_sec`, `heap`, `sleep`, `sleep_mode`, `load_avg`, `mqtt_count` - base fields like `uptime`, `uptime_sec`, `heap`, `sleep`, `sleep_mode`, `load_avg`, `mqtt_count`
- relay state fields like `power`, `power1` through `power4` - relay state fields like `power`, `power1` through `power4`
@@ -419,6 +419,18 @@
} }
] ]
}, },
{
"matcher": {
"id": "byName",
"options": "alias"
},
"properties": [
{
"id": "custom.width",
"value": 180
}
]
},
{ {
"matcher": { "matcher": {
"id": "byName", "id": "byName",
@@ -567,9 +579,9 @@
}, },
"editorMode": "code", "editorMode": "code",
"format": "table", "format": "table",
"query": "SELECT devices.device, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", "query": "SELECT devices.device, coalesce(meta.device_alias, state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON devices.device = meta.device LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device",
"rawQuery": true, "rawQuery": true,
"rawSql": "SELECT devices.device, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT s.device, s.time, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device", "rawSql": "SELECT devices.device, coalesce(meta.device_alias, state.device_alias, sensor.device_alias, '') AS alias, devices.last_seen, to_unixtime(now()) - to_unixtime(devices.last_seen) AS age_s, sensor.energy_power AS current_draw_w, sensor.energy_today AS daily_draw_kwh, sensor.energy_total AS total_draw_kwh, state.wifi_signal AS wifi_signal_dbm, state.uptime_sec / 3600.0 AS uptime_hours FROM (SELECT device, max(time) AS last_seen FROM (SELECT device, time FROM tasmota_state WHERE $__timeFilter(time) UNION ALL SELECT device, time FROM tasmota_sensor WHERE $__timeFilter(time)) all_messages GROUP BY device) devices LEFT JOIN (SELECT m.device, m.device_alias FROM tasmota_device_meta m INNER JOIN (SELECT device, max(time) AS time FROM tasmota_device_meta GROUP BY device) latest_meta ON m.device = latest_meta.device AND m.time = latest_meta.time) meta ON devices.device = meta.device LEFT JOIN (SELECT s.device, s.time, s.device_alias, s.energy_power, s.energy_today, s.energy_total FROM tasmota_sensor s INNER JOIN (SELECT device, max(time) AS time FROM tasmota_sensor WHERE $__timeFilter(time) GROUP BY device) latest_sensor ON s.device = latest_sensor.device AND s.time = latest_sensor.time) sensor ON devices.device = sensor.device LEFT JOIN (SELECT st.device, st.time, st.device_alias, st.wifi_signal, st.uptime_sec FROM tasmota_state st INNER JOIN (SELECT device, max(time) AS time FROM tasmota_state WHERE $__timeFilter(time) GROUP BY device) latest_state ON st.device = latest_state.device AND st.time = latest_state.time) state ON devices.device = state.device ORDER BY devices.device",
"refId": "A" "refId": "A"
} }
], ],
@@ -625,6 +637,6 @@
"timezone": "browser", "timezone": "browser",
"title": "Tasmota Device Summary", "title": "Tasmota Device Summary",
"uid": "tasmota-device-summary", "uid": "tasmota-device-summary",
"version": 6, "version": 7,
"weekStart": "" "weekStart": ""
} }
+52 -3
View File
@@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -12,10 +13,13 @@ import (
const envPrefix = "MQTT_SCRUBBER_" const envPrefix = "MQTT_SCRUBBER_"
var invalidDeviceCharacters = regexp.MustCompile(`[^a-z0-9_]+`)
type Config struct { type Config struct {
MQTT MQTTConfig `json:"mqtt"` MQTT MQTTConfig `json:"mqtt"`
Influx InfluxConfig `json:"influx"` Influx InfluxConfig `json:"influx"`
App AppConfig `json:"app"` App AppConfig `json:"app"`
DeviceAliases map[string]string `json:"device_aliases"`
} }
type MQTTConfig struct { type MQTTConfig struct {
@@ -80,6 +84,8 @@ func Load(path string) (Config, error) {
return Config{}, err return Config{}, err
} }
cfg.DeviceAliases = normalizeDeviceAliases(cfg.DeviceAliases)
if err := cfg.Validate(); err != nil { if err := cfg.Validate(); err != nil {
return Config{}, err return Config{}, err
} }
@@ -161,6 +167,18 @@ func applyEnvOverrides(cfg *Config) error {
setString(&cfg.App.LogLevel, envPrefix+"APP_LOG_LEVEL") setString(&cfg.App.LogLevel, envPrefix+"APP_LOG_LEVEL")
setString(&cfg.App.HealthAddress, envPrefix+"APP_HEALTH_ADDRESS") setString(&cfg.App.HealthAddress, envPrefix+"APP_HEALTH_ADDRESS")
if raw, ok := os.LookupEnv(envPrefix + "DEVICE_ALIASES"); ok {
if strings.TrimSpace(raw) == "" {
cfg.DeviceAliases = nil
} else {
aliases := make(map[string]string)
if err := json.Unmarshal([]byte(raw), &aliases); err != nil {
return fmt.Errorf("parse %sDEVICE_ALIASES: %w", envPrefix, err)
}
cfg.DeviceAliases = aliases
}
}
if raw, ok := os.LookupEnv(envPrefix + "MQTT_TOPICS"); ok { if raw, ok := os.LookupEnv(envPrefix + "MQTT_TOPICS"); ok {
cfg.MQTT.Topics = splitAndTrim(raw) cfg.MQTT.Topics = splitAndTrim(raw)
} }
@@ -227,3 +245,34 @@ func splitAndTrim(value string) []string {
return result return result
} }
func normalizeDeviceAliases(aliases map[string]string) map[string]string {
if len(aliases) == 0 {
return nil
}
normalized := make(map[string]string, len(aliases))
for device, alias := range aliases {
cleanDevice := normalizeDeviceKey(device)
cleanAlias := strings.TrimSpace(alias)
if cleanDevice == "" || cleanAlias == "" {
continue
}
normalized[cleanDevice] = cleanAlias
}
if len(normalized) == 0 {
return nil
}
return normalized
}
func normalizeDeviceKey(value string) string {
normalized := strings.ToLower(strings.TrimSpace(value))
normalized = strings.ReplaceAll(normalized, "-", "_")
normalized = strings.ReplaceAll(normalized, " ", "_")
normalized = invalidDeviceCharacters.ReplaceAllString(normalized, "_")
normalized = strings.Trim(normalized, "_")
return normalized
}
+107
View File
@@ -0,0 +1,107 @@
package config
import (
"os"
"path/filepath"
"testing"
)
func TestLoadNormalizesDeviceAliases(t *testing.T) {
t.Setenv("MQTT_SCRUBBER_DEVICE_ALIASES", "")
configPath := filepath.Join(t.TempDir(), "config.json")
contents := `{
"mqtt": {
"broker": "tcp://127.0.0.1:1883",
"client_id": "mqqt-scrubber",
"topics": ["tele/+/STATE"],
"qos": 0
},
"influx": {
"url": "http://127.0.0.1:8181",
"database": "home",
"precision": "ns"
},
"app": {
"batch_size": 200,
"buffer_size": 1000,
"flush_interval": "10s",
"flush_timeout": "10s",
"log_level": "info",
"health_address": ":8080"
},
"device_aliases": {
"Kitchen-Plug": "Kitchen Plug",
" Patio Sensor ": "Patio Sensor",
"unused": " "
}
}`
if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil {
t.Fatalf("write config file: %v", err)
}
cfg, err := Load(configPath)
if err != nil {
t.Fatalf("Load returned error: %v", err)
}
if got := cfg.DeviceAliases["kitchen_plug"]; got != "Kitchen Plug" {
t.Fatalf("unexpected kitchen alias: got %q", got)
}
if got := cfg.DeviceAliases["patio_sensor"]; got != "Patio Sensor" {
t.Fatalf("unexpected patio alias: got %q", got)
}
if _, exists := cfg.DeviceAliases["unused"]; exists {
t.Fatal("expected blank aliases to be discarded")
}
}
func TestLoadOverridesDeviceAliasesFromEnv(t *testing.T) {
t.Setenv("MQTT_SCRUBBER_DEVICE_ALIASES", `{"Desk-Plug":"Desk Plug"}`)
configPath := filepath.Join(t.TempDir(), "config.json")
contents := `{
"mqtt": {
"broker": "tcp://127.0.0.1:1883",
"client_id": "mqqt-scrubber",
"topics": ["tele/+/STATE"],
"qos": 0
},
"influx": {
"url": "http://127.0.0.1:8181",
"database": "home",
"precision": "ns"
},
"app": {
"batch_size": 200,
"buffer_size": 1000,
"flush_interval": "10s",
"flush_timeout": "10s",
"log_level": "info",
"health_address": ":8080"
},
"device_aliases": {
"Kitchen-Plug": "Kitchen Plug"
}
}`
if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil {
t.Fatalf("write config file: %v", err)
}
cfg, err := Load(configPath)
if err != nil {
t.Fatalf("Load returned error: %v", err)
}
if len(cfg.DeviceAliases) != 1 {
t.Fatalf("expected env aliases to replace file aliases, got %d entries", len(cfg.DeviceAliases))
}
if got := cfg.DeviceAliases["desk_plug"]; got != "Desk Plug" {
t.Fatalf("unexpected desk alias: got %q", got)
}
}
+111 -12
View File
@@ -3,6 +3,9 @@ package pipeline
import ( import (
"context" "context"
"log/slog" "log/slog"
"regexp"
"sort"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -11,19 +14,23 @@ import (
"mqqt-scrubber/internal/parser" "mqqt-scrubber/internal/parser"
) )
var invalidDeviceAliasCharacters = regexp.MustCompile(`[^a-z0-9_]+`)
type writer interface { type writer interface {
Write(ctx context.Context, records []model.Record) error Write(ctx context.Context, records []model.Record) error
} }
type Service struct { type Service struct {
config config.Config config config.Config
influxClient writer deviceAliases map[string]string
input chan model.RawMessage aliasRecords []model.Record
received atomic.Uint64 influxClient writer
parsed atomic.Uint64 input chan model.RawMessage
written atomic.Uint64 received atomic.Uint64
dropped atomic.Uint64 parsed atomic.Uint64
failed atomic.Uint64 written atomic.Uint64
dropped atomic.Uint64
failed atomic.Uint64
} }
type Snapshot struct { type Snapshot struct {
@@ -35,10 +42,13 @@ type Snapshot struct {
} }
func NewService(cfg config.Config, influxClient writer) *Service { func NewService(cfg config.Config, influxClient writer) *Service {
normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases)
return &Service{ return &Service{
config: cfg, config: cfg,
influxClient: influxClient, deviceAliases: normalizedAliases,
input: make(chan model.RawMessage, cfg.App.BufferSize), aliasRecords: buildAliasRecords(normalizedAliases),
influxClient: influxClient,
input: make(chan model.RawMessage, cfg.App.BufferSize),
} }
} }
@@ -57,7 +67,18 @@ func (service *Service) Run(ctx context.Context) error {
ticker := time.NewTicker(service.config.App.FlushInterval.Duration) ticker := time.NewTicker(service.config.App.FlushInterval.Duration)
defer ticker.Stop() defer ticker.Stop()
batch := make([]model.Record, 0, service.config.App.BatchSize) batch := append(make([]model.Record, 0, service.config.App.BatchSize+len(service.aliasRecords)), service.aliasRecords...)
if len(batch) > 0 {
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
err := service.flush(flushCtx, batch)
cancel()
if err != nil {
slog.Error("failed to flush alias metadata to influx; will retry on next interval", "count", len(batch), "error", err)
} else {
batch = batch[:0]
}
}
var input <-chan model.RawMessage = service.input var input <-chan model.RawMessage = service.input
for { for {
@@ -86,6 +107,8 @@ func (service *Service) Run(ctx context.Context) error {
continue continue
} }
service.applyDeviceAliases(records)
service.parsed.Add(uint64(len(records))) service.parsed.Add(uint64(len(records)))
batch = append(batch, records...) batch = append(batch, records...)
@@ -150,3 +173,79 @@ func (service *Service) Snapshot() Snapshot {
Failed: service.failed.Load(), Failed: service.failed.Load(),
} }
} }
func (service *Service) applyDeviceAliases(records []model.Record) {
if len(service.deviceAliases) == 0 {
return
}
for index := range records {
device := normalizeDeviceKey(records[index].Tags["device"])
alias, ok := service.deviceAliases[device]
if !ok || alias == "" {
continue
}
records[index].Fields["device_alias"] = alias
}
}
func normalizeDeviceAliases(aliases map[string]string) map[string]string {
if len(aliases) == 0 {
return nil
}
normalized := make(map[string]string, len(aliases))
for device, alias := range aliases {
cleanDevice := normalizeDeviceKey(device)
cleanAlias := strings.TrimSpace(alias)
if cleanDevice == "" || cleanAlias == "" {
continue
}
normalized[cleanDevice] = cleanAlias
}
if len(normalized) == 0 {
return nil
}
return normalized
}
func normalizeDeviceKey(value string) string {
normalized := strings.ToLower(strings.TrimSpace(value))
normalized = strings.ReplaceAll(normalized, "-", "_")
normalized = strings.ReplaceAll(normalized, " ", "_")
normalized = invalidDeviceAliasCharacters.ReplaceAllString(normalized, "_")
normalized = strings.Trim(normalized, "_")
return normalized
}
func buildAliasRecords(aliases map[string]string) []model.Record {
if len(aliases) == 0 {
return nil
}
devices := make([]string, 0, len(aliases))
for device := range aliases {
devices = append(devices, device)
}
sort.Strings(devices)
timestamp := time.Now().UTC()
records := make([]model.Record, 0, len(devices))
for _, device := range devices {
records = append(records, model.Record{
Measurement: "tasmota_device_meta",
Tags: map[string]string{
"device": device,
"source": "config",
},
Fields: map[string]any{
"device_alias": aliases[device],
},
Timestamp: timestamp,
})
}
return records
}
+45 -1
View File
@@ -43,9 +43,25 @@ func (writer *fakeWriter) firstBatch() []model.Record {
return writer.batches[0] return writer.batches[0]
} }
func (writer *fakeWriter) allBatches() [][]model.Record {
writer.mu.Lock()
defer writer.mu.Unlock()
copyBatches := make([][]model.Record, 0, len(writer.batches))
for _, batch := range writer.batches {
copyBatches = append(copyBatches, append([]model.Record(nil), batch...))
}
return copyBatches
}
func TestServiceFlushesParsedRecords(t *testing.T) { func TestServiceFlushesParsedRecords(t *testing.T) {
fake := newFakeWriter() fake := newFakeWriter()
service := NewService(config.Config{ service := NewService(config.Config{
DeviceAliases: map[string]string{
"tasmota-896001": "Garage Plug",
"Tasmota C88994": "Office Plug",
},
App: config.AppConfig{ App: config.AppConfig{
BatchSize: 2, BatchSize: 2,
BufferSize: 8, BufferSize: 8,
@@ -62,6 +78,12 @@ func TestServiceFlushesParsedRecords(t *testing.T) {
errCh <- service.Run(ctx) errCh <- service.Run(ctx)
}() }()
select {
case <-fake.flushed:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for alias metadata flush")
}
service.Enqueue(model.RawMessage{ service.Enqueue(model.RawMessage{
Topic: "tele/tasmota_896001/LWT", Topic: "tele/tasmota_896001/LWT",
Payload: []byte("Online"), Payload: []byte("Online"),
@@ -79,7 +101,23 @@ func TestServiceFlushesParsedRecords(t *testing.T) {
t.Fatal("timed out waiting for pipeline flush") t.Fatal("timed out waiting for pipeline flush")
} }
batch := fake.firstBatch() batches := fake.allBatches()
if len(batches) != 2 {
t.Fatalf("expected 2 flushed batches, got %d", len(batches))
}
metaBatch := batches[0]
if len(metaBatch) != 2 {
t.Fatalf("expected 2 alias metadata records, got %d", len(metaBatch))
}
if metaBatch[0].Measurement != "tasmota_device_meta" {
t.Fatalf("unexpected metadata measurement: %s", metaBatch[0].Measurement)
}
if metaBatch[0].Fields["device_alias"] == "" {
t.Fatalf("expected metadata batch to include device_alias, got %#v", metaBatch[0].Fields["device_alias"])
}
batch := batches[1]
if len(batch) != 2 { if len(batch) != 2 {
t.Fatalf("expected 2 records in flushed batch, got %d", len(batch)) t.Fatalf("expected 2 records in flushed batch, got %d", len(batch))
} }
@@ -90,6 +128,9 @@ func TestServiceFlushesParsedRecords(t *testing.T) {
if batch[0].Fields["online"] != true { if batch[0].Fields["online"] != true {
t.Fatalf("unexpected lwt online field: %#v", batch[0].Fields["online"]) t.Fatalf("unexpected lwt online field: %#v", batch[0].Fields["online"])
} }
if batch[0].Fields["device_alias"] != "Garage Plug" {
t.Fatalf("unexpected lwt device_alias field: %#v", batch[0].Fields["device_alias"])
}
if batch[1].Measurement != "tasmota_sensor" { if batch[1].Measurement != "tasmota_sensor" {
t.Fatalf("unexpected second measurement: %s", batch[1].Measurement) t.Fatalf("unexpected second measurement: %s", batch[1].Measurement)
@@ -97,6 +138,9 @@ func TestServiceFlushesParsedRecords(t *testing.T) {
if batch[1].Fields["energy_total"] != float64(41.385) { if batch[1].Fields["energy_total"] != float64(41.385) {
t.Fatalf("unexpected sensor energy_total field: %#v", batch[1].Fields["energy_total"]) t.Fatalf("unexpected sensor energy_total field: %#v", batch[1].Fields["energy_total"])
} }
if batch[1].Fields["device_alias"] != "Office Plug" {
t.Fatalf("unexpected sensor device_alias field: %#v", batch[1].Fields["device_alias"])
}
cancel() cancel()
select { select {