Files
mqqt-scrubber/docs/ARCHITECTURE.md
T

176 lines
5.2 KiB
Markdown

# Architecture
## Problem statement
The service needs to sit in the background between an MQTT broker and InfluxDB v3. Its main job is to reliably consume selected MQTT topics, normalize mainly Tasmota payloads, and write time-series data into InfluxDB using a schema that is stable and query-friendly.
## Design goals
- keep the binary small and operationally simple
- treat MQTT consumption and Influx writes as independent failure domains
- avoid blocking in MQTT callbacks
- keep memory bounded with explicit buffering and batching
- optimize for Tasmota first, not generic ETL
## Non-goals for v1
- message replay from broker history
- general-purpose transformation DSL
- UI, dashboards, or query APIs
- support for every possible MQTT device type
## High-level flow
1. Load config from file and environment.
2. Start the batch pipeline.
3. Connect to MQTT and subscribe to configured topics.
4. Receive raw MQTT messages and enqueue them.
5. Parse supported Tasmota payloads into internal records.
6. Batch records and flush them to InfluxDB v3 using line protocol.
7. On shutdown, stop intake and flush remaining buffered records.
## Components
### `cmd/mqqt-scrubber`
Application entrypoint. Handles flags, startup logging, signal handling, and top-level wiring.
### `internal/config`
Loads config from JSON, applies environment overrides, and validates required fields.
### `internal/mqtt`
Owns broker connection management and topic subscriptions. The MQTT callback path should never perform parsing or network writes directly. It only enqueues raw messages into the internal pipeline.
### `internal/parser`
Transforms supported Tasmota JSON payloads into normalized records. The parser is strict about JSON validity but tolerant of unknown keys.
### `internal/pipeline`
Receives raw messages, invokes the parser, batches records, and triggers flushes by size or time.
### `internal/influx`
Serializes records into line protocol and writes them to InfluxDB v3 through `/api/v3/write_lp`.
## Internal record model
Each parsed message becomes one or more records with:
- measurement
- tags
- fields
- timestamp
Current scaffold emits one record per Tasmota message type with flattened field names.
Example:
```text
measurement: tasmota_sensor
tags:
device: kitchen-plug
message_type: sensor
source: tasmota
fields:
energy_power: 42
energy_voltage: 230
si7021_temperature: 21.4
si7021_humidity: 44
timestamp: payload Time or message receive time
```
## Schema guidance
- tags should stay low-cardinality
- device identity belongs in tags
- sensor readings belong in fields
- avoid putting dynamic keys or values in tags
- flatten nested JSON keys into stable underscore-separated field names
## Finalized v1 schema
### Topic families
- `tele/<device>/LWT`
- `tele/<device>/STATE`
- `tele/<device>/SENSOR`
### Measurements
- `tasmota_lwt`
- `tasmota_state`
- `tasmota_sensor`
### Tags
- `device`: sanitized device identifier with hyphens normalized to underscores
- `message_type`: `lwt`, `state`, or `sensor`
- `source`: always `tasmota`
### Field naming
- nested JSON objects are flattened with underscore separators
- camelCase Tasmota keys are normalized to snake_case
- examples:
- `UptimeSec` -> `uptime_sec`
- `TempUnit` -> `temp_unit`
- `TotalStartTime` -> `total_start_time`
- `Berry.HeapUsed` -> `berry_heap_used`
### Current field families
- `LWT`:
- `state` as string
- `online` as boolean
- all measurements may also include `device_alias` as an optional string when configured externally
- `STATE`:
- base fields like `uptime`, `uptime_sec`, `heap`, `sleep`, `sleep_mode`, `load_avg`, `mqtt_count`
- relay state fields like `power`, `power1` through `power4`
- Wi-Fi fields like `wifi_rssi`, `wifi_signal`, `wifi_channel`, `wifi_link_count`, `wifi_mode`
- Berry fields like `berry_heap_used`, `berry_objects`
- `SENSOR`:
- energy fields like `energy_total`, `energy_today`, `energy_power`, `energy_voltage`, `energy_total_start_time`
- analog fields like `analog_temperature` and `analog_a0`
- optional `temp_unit`
### Timestamp handling
- prefer payload `Time` when present
- accept RFC3339 and timezone-less Tasmota timestamps in the form `2006-01-02T15:04:05`
- interpret timezone-less timestamps in the configured `tasmota_time_zone` location and convert them to UTC for storage
## Failure handling
### MQTT unavailable
- rely on automatic reconnect
- resubscribe in the connect handler
- keep logs explicit about reconnect state
### Influx unavailable
- keep records in the in-memory batch until flush attempt returns
- retry on the next scheduled flush
- bound total in-memory intake with a channel capacity
### Invalid payload
- log parse failure with topic context
- skip the payload
- continue processing subsequent messages
## Initial deployment shape
- one process per broker or environment
- systemd service or Docker container
- config file mounted locally with secrets overridden by environment variables where practical
## Immediate next improvements
- add parser fixtures based on real Tasmota payloads
- add counters and metrics
- add integration testing against a local broker and InfluxDB
- evaluate whether a persistent spool is needed for outage tolerance