2 Commits

Author SHA1 Message Date
yugi 32b508ac55 Embed timezone data in runtime binary 2026-03-16 12:21:25 +01:00
yugi 585378297c Add Tasmota timezone handling and dashboard updates 2026-03-16 12:17:40 +01:00
11 changed files with 1224 additions and 1027 deletions
+3
View File
@@ -59,9 +59,12 @@ Supported environment variables:
- `MQTT_SCRUBBER_APP_FLUSH_TIMEOUT` - `MQTT_SCRUBBER_APP_FLUSH_TIMEOUT`
- `MQTT_SCRUBBER_APP_LOG_LEVEL` - `MQTT_SCRUBBER_APP_LOG_LEVEL`
- `MQTT_SCRUBBER_APP_HEALTH_ADDRESS` - `MQTT_SCRUBBER_APP_HEALTH_ADDRESS`
- `MQTT_SCRUBBER_APP_TASMOTA_TIME_ZONE`
`MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list. `MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list.
Timezone-less Tasmota `Time` values are interpreted in `tasmota_time_zone`. Set it to your device timezone such as `Europe/Prague` so the stored timestamp matches the device-local clock. The default is `UTC` for deterministic behavior.
You can also define optional per-device aliases in config with a top-level `device_aliases` object. Keys are normalized like device tags, so `kitchen-plug`, `Kitchen Plug`, and `kitchen_plug` all resolve to `kitchen_plug`. You can also define optional per-device aliases in config with a top-level `device_aliases` object. Keys are normalized like device tags, so `kitchen-plug`, `Kitchen Plug`, and `kitchen_plug` all resolve to `kitchen_plug`.
## Run ## Run
+1
View File
@@ -8,6 +8,7 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"time" "time"
_ "time/tzdata"
"mqqt-scrubber/internal/config" "mqqt-scrubber/internal/config"
"mqqt-scrubber/internal/health" "mqqt-scrubber/internal/health"
+2 -1
View File
@@ -26,6 +26,7 @@
"flush_interval": "10s", "flush_interval": "10s",
"flush_timeout": "10s", "flush_timeout": "10s",
"log_level": "info", "log_level": "info",
"health_address": ":8080" "health_address": ":8080",
"tasmota_time_zone": "Europe/Prague"
} }
} }
+1
View File
@@ -8,6 +8,7 @@ services:
- ./config.json:/app/config.json:ro - ./config.json:/app/config.json:ro
environment: environment:
MQTT_SCRUBBER_APP_HEALTH_ADDRESS: ":8080" MQTT_SCRUBBER_APP_HEALTH_ADDRESS: ":8080"
MQTT_SCRUBBER_APP_TASMOTA_TIME_ZONE: "Europe/Prague"
networks: networks:
- iot-network - iot-network
healthcheck: healthcheck:
+1 -1
View File
@@ -140,7 +140,7 @@ timestamp: payload Time or message receive time
- prefer payload `Time` when present - prefer payload `Time` when present
- accept RFC3339 and timezone-less Tasmota timestamps in the form `2006-01-02T15:04:05` - accept RFC3339 and timezone-less Tasmota timestamps in the form `2006-01-02T15:04:05`
- current implementation interprets timezone-less timestamps as UTC - interpret timezone-less timestamps in the configured `tasmota_time_zone` location and convert them to UTC for storage
## Failure handling ## Failure handling
File diff suppressed because it is too large Load Diff
+44 -12
View File
@@ -39,12 +39,13 @@ type InfluxConfig struct {
} }
type AppConfig struct { type AppConfig struct {
BatchSize int `json:"batch_size"` BatchSize int `json:"batch_size"`
BufferSize int `json:"buffer_size"` BufferSize int `json:"buffer_size"`
FlushInterval DurationValue `json:"flush_interval"` FlushInterval DurationValue `json:"flush_interval"`
FlushTimeout DurationValue `json:"flush_timeout"` FlushTimeout DurationValue `json:"flush_timeout"`
LogLevel string `json:"log_level"` LogLevel string `json:"log_level"`
HealthAddress string `json:"health_address"` HealthAddress string `json:"health_address"`
TasmotaTimeZone string `json:"tasmota_time_zone"`
} }
type DurationValue struct { type DurationValue struct {
@@ -124,6 +125,9 @@ func (cfg Config) Validate() error {
if cfg.App.FlushTimeout.Duration <= 0 { if cfg.App.FlushTimeout.Duration <= 0 {
return errors.New("app flush_timeout must be greater than zero") return errors.New("app flush_timeout must be greater than zero")
} }
if _, err := loadConfiguredLocation(cfg.App.TasmotaTimeZone); err != nil {
return err
}
return nil return nil
} }
@@ -145,12 +149,13 @@ func defaultConfig() Config {
Precision: "ns", Precision: "ns",
}, },
App: AppConfig{ App: AppConfig{
BatchSize: 200, BatchSize: 200,
BufferSize: 1000, BufferSize: 1000,
FlushInterval: DurationValue{Duration: 10 * time.Second}, FlushInterval: DurationValue{Duration: 10 * time.Second},
FlushTimeout: DurationValue{Duration: 10 * time.Second}, FlushTimeout: DurationValue{Duration: 10 * time.Second},
LogLevel: "info", LogLevel: "info",
HealthAddress: ":8080", HealthAddress: ":8080",
TasmotaTimeZone: "UTC",
}, },
} }
} }
@@ -166,6 +171,7 @@ func applyEnvOverrides(cfg *Config) error {
setString(&cfg.Influx.Precision, envPrefix+"INFLUX_PRECISION") setString(&cfg.Influx.Precision, envPrefix+"INFLUX_PRECISION")
setString(&cfg.App.LogLevel, envPrefix+"APP_LOG_LEVEL") setString(&cfg.App.LogLevel, envPrefix+"APP_LOG_LEVEL")
setString(&cfg.App.HealthAddress, envPrefix+"APP_HEALTH_ADDRESS") setString(&cfg.App.HealthAddress, envPrefix+"APP_HEALTH_ADDRESS")
setString(&cfg.App.TasmotaTimeZone, envPrefix+"APP_TASMOTA_TIME_ZONE")
if raw, ok := os.LookupEnv(envPrefix + "DEVICE_ALIASES"); ok { if raw, ok := os.LookupEnv(envPrefix + "DEVICE_ALIASES"); ok {
if strings.TrimSpace(raw) == "" { if strings.TrimSpace(raw) == "" {
@@ -276,3 +282,29 @@ func normalizeDeviceKey(value string) string {
normalized = strings.Trim(normalized, "_") normalized = strings.Trim(normalized, "_")
return normalized return normalized
} }
func (cfg AppConfig) TasmotaLocation() *time.Location {
location, err := loadConfiguredLocation(cfg.TasmotaTimeZone)
if err != nil {
return time.UTC
}
return location
}
func loadConfiguredLocation(name string) (*time.Location, error) {
trimmed := strings.TrimSpace(name)
if trimmed == "" || strings.EqualFold(trimmed, "utc") {
return time.UTC, nil
}
if strings.EqualFold(trimmed, "local") {
return time.Local, nil
}
location, err := time.LoadLocation(trimmed)
if err != nil {
return nil, fmt.Errorf("app tasmota_time_zone %q is invalid: %w", trimmed, err)
}
return location, nil
}
+73 -2
View File
@@ -7,8 +7,6 @@ import (
) )
func TestLoadNormalizesDeviceAliases(t *testing.T) { func TestLoadNormalizesDeviceAliases(t *testing.T) {
t.Setenv("MQTT_SCRUBBER_DEVICE_ALIASES", "")
configPath := filepath.Join(t.TempDir(), "config.json") configPath := filepath.Join(t.TempDir(), "config.json")
contents := `{ contents := `{
"mqtt": { "mqtt": {
@@ -105,3 +103,76 @@ func TestLoadOverridesDeviceAliasesFromEnv(t *testing.T) {
t.Fatalf("unexpected desk alias: got %q", got) t.Fatalf("unexpected desk alias: got %q", got)
} }
} }
func TestLoadSupportsTasmotaTimeZone(t *testing.T) {
configPath := filepath.Join(t.TempDir(), "config.json")
contents := `{
"mqtt": {
"broker": "tcp://127.0.0.1:1883",
"client_id": "mqqt-scrubber",
"topics": ["tele/+/STATE"],
"qos": 0
},
"influx": {
"url": "http://127.0.0.1:8181",
"database": "home",
"precision": "ns"
},
"app": {
"batch_size": 200,
"buffer_size": 1000,
"flush_interval": "10s",
"flush_timeout": "10s",
"log_level": "info",
"health_address": ":8080",
"tasmota_time_zone": "Europe/Prague"
}
}`
if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil {
t.Fatalf("write config file: %v", err)
}
cfg, err := Load(configPath)
if err != nil {
t.Fatalf("Load returned error: %v", err)
}
if got := cfg.App.TasmotaLocation().String(); got != "Europe/Prague" {
t.Fatalf("unexpected tasmota timezone: got %q", got)
}
}
func TestLoadRejectsInvalidTasmotaTimeZone(t *testing.T) {
configPath := filepath.Join(t.TempDir(), "config.json")
contents := `{
"mqtt": {
"broker": "tcp://127.0.0.1:1883",
"client_id": "mqqt-scrubber",
"topics": ["tele/+/STATE"],
"qos": 0
},
"influx": {
"url": "http://127.0.0.1:8181",
"database": "home",
"precision": "ns"
},
"app": {
"batch_size": 200,
"buffer_size": 1000,
"flush_interval": "10s",
"flush_timeout": "10s",
"log_level": "info",
"health_address": ":8080",
"tasmota_time_zone": "Not/AZone"
}
}`
if err := os.WriteFile(configPath, []byte(contents), 0o644); err != nil {
t.Fatalf("write config file: %v", err)
}
if _, err := Load(configPath); err == nil {
t.Fatal("expected Load to reject invalid tasmota_time_zone")
}
}
+15 -4
View File
@@ -22,6 +22,14 @@ var tasmotaTimeLayouts = []string{
} }
func ParseTasmota(message model.RawMessage) ([]model.Record, error) { func ParseTasmota(message model.RawMessage) ([]model.Record, error) {
return ParseTasmotaInLocation(message, time.UTC)
}
func ParseTasmotaInLocation(message model.RawMessage, timezoneLessTimeLocation *time.Location) ([]model.Record, error) {
if timezoneLessTimeLocation == nil {
timezoneLessTimeLocation = time.UTC
}
parts := strings.Split(message.Topic, "/") parts := strings.Split(message.Topic, "/")
if len(parts) != 3 { if len(parts) != 3 {
return nil, fmt.Errorf("unsupported topic shape: %s", message.Topic) return nil, fmt.Errorf("unsupported topic shape: %s", message.Topic)
@@ -52,7 +60,7 @@ func ParseTasmota(message model.RawMessage) ([]model.Record, error) {
return nil, fmt.Errorf("no usable fields in payload for topic %s", message.Topic) return nil, fmt.Errorf("no usable fields in payload for topic %s", message.Topic)
} }
timestamp := parsePayloadTimestamp(payload, message.ReceivedAt) timestamp := parsePayloadTimestamp(payload, message.ReceivedAt, timezoneLessTimeLocation)
record := model.Record{ record := model.Record{
Measurement: measurement, Measurement: measurement,
@@ -77,11 +85,14 @@ func parseLWT(message model.RawMessage, measurement string, tags map[string]stri
} }
} }
func parsePayloadTimestamp(payload map[string]any, fallback time.Time) time.Time { func parsePayloadTimestamp(payload map[string]any, fallback time.Time, timezoneLessTimeLocation *time.Location) time.Time {
rawTime, ok := payload["Time"].(string) rawTime, ok := payload["Time"].(string)
if !ok || strings.TrimSpace(rawTime) == "" { if !ok || strings.TrimSpace(rawTime) == "" {
return fallback return fallback
} }
if timezoneLessTimeLocation == nil {
timezoneLessTimeLocation = time.UTC
}
for _, layout := range tasmotaTimeLayouts { for _, layout := range tasmotaTimeLayouts {
var ( var (
@@ -90,13 +101,13 @@ func parsePayloadTimestamp(payload map[string]any, fallback time.Time) time.Time
) )
if layout == "2006-01-02T15:04:05" { if layout == "2006-01-02T15:04:05" {
parsed, err = time.ParseInLocation(layout, rawTime, time.UTC) parsed, err = time.ParseInLocation(layout, rawTime, timezoneLessTimeLocation)
} else { } else {
parsed, err = time.Parse(layout, rawTime) parsed, err = time.Parse(layout, rawTime)
} }
if err == nil { if err == nil {
return parsed return parsed.UTC()
} }
} }
+23
View File
@@ -85,6 +85,29 @@ func TestParseTasmotaFixtures(t *testing.T) {
} }
} }
func TestParseTasmotaInLocationUsesConfiguredTimezoneForTimezoneLessTime(t *testing.T) {
location := time.FixedZone("CET", 3600)
receivedAt := time.Date(2026, time.March, 16, 11, 55, 30, 0, time.UTC)
records, err := ParseTasmotaInLocation(model.RawMessage{
Topic: "tele/tasmota_67850B/SENSOR",
Payload: []byte(`{"Time":"2026-03-16T11:55:29","ENERGY":{"Power":9}}`),
ReceivedAt: receivedAt,
}, location)
if err != nil {
t.Fatalf("ParseTasmotaInLocation returned error: %v", err)
}
if len(records) != 1 {
t.Fatalf("expected 1 record, got %d", len(records))
}
want := time.Date(2026, time.March, 16, 10, 55, 29, 0, time.UTC)
if !records[0].Timestamp.Equal(want) {
t.Fatalf("unexpected timestamp: got %s want %s", records[0].Timestamp.Format(time.RFC3339), want.Format(time.RFC3339))
}
}
func fieldEquals(got any, want any) bool { func fieldEquals(got any, want any) bool {
switch typedWant := want.(type) { switch typedWant := want.(type) {
case float64: case float64:
+22 -20
View File
@@ -21,18 +21,19 @@ type writer interface {
} }
type Service struct { type Service struct {
config config.Config config config.Config
deviceAliases map[string]string tasmotaTimeLocation *time.Location
aliasRecords []model.Record deviceAliases map[string]string
snapshots map[string]*deviceSnapshot aliasRecords []model.Record
dirtyDevices map[string]struct{} snapshots map[string]*deviceSnapshot
influxClient writer dirtyDevices map[string]struct{}
input chan model.RawMessage influxClient writer
received atomic.Uint64 input chan model.RawMessage
parsed atomic.Uint64 received atomic.Uint64
written atomic.Uint64 parsed atomic.Uint64
dropped atomic.Uint64 written atomic.Uint64
failed atomic.Uint64 dropped atomic.Uint64
failed atomic.Uint64
} }
type Snapshot struct { type Snapshot struct {
@@ -55,13 +56,14 @@ type deviceSnapshot struct {
func NewService(cfg config.Config, influxClient writer) *Service { func NewService(cfg config.Config, influxClient writer) *Service {
normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases) normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases)
return &Service{ return &Service{
config: cfg, config: cfg,
deviceAliases: normalizedAliases, tasmotaTimeLocation: cfg.App.TasmotaLocation(),
aliasRecords: buildAliasRecords(normalizedAliases), deviceAliases: normalizedAliases,
snapshots: make(map[string]*deviceSnapshot), aliasRecords: buildAliasRecords(normalizedAliases),
dirtyDevices: make(map[string]struct{}), snapshots: make(map[string]*deviceSnapshot),
influxClient: influxClient, dirtyDevices: make(map[string]struct{}),
input: make(chan model.RawMessage, cfg.App.BufferSize), influxClient: influxClient,
input: make(chan model.RawMessage, cfg.App.BufferSize),
} }
} }
@@ -113,7 +115,7 @@ func (service *Service) Run(ctx context.Context) error {
service.logCounters() service.logCounters()
return nil return nil
case message := <-input: case message := <-input:
records, err := parser.ParseTasmota(message) records, err := parser.ParseTasmotaInLocation(message, service.tasmotaTimeLocation)
if err != nil { if err != nil {
service.failed.Add(1) service.failed.Add(1)
slog.Warn("failed to parse message", "topic", message.Topic, "error", err) slog.Warn("failed to parse message", "topic", message.Topic, "error", err)