Files
2026-03-14 22:42:27 +01:00

149 lines
5.1 KiB
Markdown

# mqqt-scrubber
Small Go daemon for collecting MQTT topics, normalizing mostly Tasmota payloads, and writing them into InfluxDB v3.
## Current status
This repository now contains:
- a milestone work plan in `docs/WORKPLAN.md`
- a technical design in `docs/ARCHITECTURE.md`
- an initial runnable scaffold for the background service
## Scope for v1
- subscribe to MQTT topics, primarily `tele/+/SENSOR` and `tele/+/STATE`
- parse Tasmota JSON payloads into normalized records
- batch writes to InfluxDB v3 using the HTTP `write_lp` endpoint
- run as a long-lived background process with reconnect and graceful shutdown
## Project layout
```text
cmd/mqqt-scrubber/ application entrypoint
internal/config/ config loading and validation
internal/influx/ InfluxDB v3 HTTP writer
internal/model/ internal message and record types
internal/mqtt/ MQTT subscriber wrapper
internal/parser/ Tasmota payload parsing and flattening
internal/pipeline/ batching, flushing, and service orchestration
docs/ work plan and architecture notes
```
## Configuration
The service loads configuration from a JSON file passed with `-config` and then applies environment variable overrides.
Start with:
```bash
cp config.example.json config.json
```
Supported environment variables:
- `MQTT_SCRUBBER_MQTT_BROKER`
- `MQTT_SCRUBBER_MQTT_USERNAME`
- `MQTT_SCRUBBER_MQTT_PASSWORD`
- `MQTT_SCRUBBER_MQTT_CLIENT_ID`
- `MQTT_SCRUBBER_MQTT_TOPICS`
- `MQTT_SCRUBBER_MQTT_QOS`
- `MQTT_SCRUBBER_INFLUX_URL`
- `MQTT_SCRUBBER_INFLUX_DATABASE`
- `MQTT_SCRUBBER_INFLUX_TOKEN`
- `MQTT_SCRUBBER_INFLUX_PRECISION`
- `MQTT_SCRUBBER_DEVICE_ALIASES` as a JSON object such as `{"kitchen_plug":"Kitchen Plug"}`
- `MQTT_SCRUBBER_APP_BATCH_SIZE`
- `MQTT_SCRUBBER_APP_BUFFER_SIZE`
- `MQTT_SCRUBBER_APP_FLUSH_INTERVAL`
- `MQTT_SCRUBBER_APP_FLUSH_TIMEOUT`
- `MQTT_SCRUBBER_APP_LOG_LEVEL`
- `MQTT_SCRUBBER_APP_HEALTH_ADDRESS`
`MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list.
You can also define optional per-device aliases in config with a top-level `device_aliases` object. Keys are normalized like device tags, so `kitchen-plug`, `Kitchen Plug`, and `kitchen_plug` all resolve to `kitchen_plug`.
## Run
```bash
go run ./cmd/mqqt-scrubber -config config.json
```
The process also exposes simple health endpoints when `health_address` is set:
- `/healthz`
- `/readyz`
- `/metrics`
## Build
```bash
go build ./cmd/mqqt-scrubber
```
## Docker
Build the runtime image:
```bash
docker build -t mqqt-scrubber .
```
Run it with your config mounted in:
```bash
docker run --rm \
-p 8080:8080 \
-v "$PWD/config.json:/app/config.json:ro" \
mqqt-scrubber
```
The container health check uses `http://127.0.0.1:8080/healthz`.
## Docker Compose
Bring the service up with the included compose file:
```bash
docker-compose up --build -d
```
It expects a local `config.json` next to the compose file and exposes port `8080` for health and metrics.
## Grafana
A simple starter dashboard for the current Tasmota schema is included at `docs/grafana/tasmota-simple-dashboard.json`.
Assumptions:
- Grafana uses the built-in `InfluxDB` datasource plugin
- the datasource is configured for InfluxDB v3 with query language set to `SQL`
- the datasource points at the same database name configured in `MQTT_SCRUBBER_INFLUX_DATABASE`
Import the dashboard JSON and bind the `DS_INFLUXDB` input to your InfluxDB datasource.
The dashboard includes a `Device` variable for the selected-device section and a `Relay Device` variable that only lists devices publishing relay fields.
Use `Device` for the health row, time series, and recent-state table. Use `Relay Device` for the relay status row so non-relay devices do not clutter that picker.
The dashboard currently visualizes fields that are known to be emitted by the parser today and that exist with the default topic subscription set:
- `tasmota_sensor`: `energy_power`, `energy_voltage`, `energy_total`, `analog_temperature`, `analog_a0`
- `tasmota_state`: `wifi_signal`, `uptime_sec`
If you also want LWT availability panels, add `tele/+/LWT` to the configured MQTT topics so the `tasmota_lwt` measurement is created.
Relay panels use the latest `power`, `power1`, `power2`, `power3`, and `power4` values from `tasmota_state`. Devices that do not publish a given relay field will show no value for that panel.
The dashboard is split into a fleet summary section and a selected-device section. The selected-device section also includes `Last Seen`, `Seconds Since Last Message`, and `Messages In Range` panels derived from both `tasmota_state` and `tasmota_sensor` timestamps.
If `device_aliases` is configured, the summary table will expose an `alias` column populated from the latest state or sensor record for each device.
## Notes
- The repo name is kept as `mqqt-scrubber` to match the existing folder.
- The current parser is intentionally narrow and optimized for Tasmota telemetry first.
- Writes use line protocol over HTTP against InfluxDB v3 `/api/v3/write_lp`.
- The Docker image includes a non-root runtime user and a simple HTTP health endpoint.
- Runtime counters are exposed in Prometheus-style text format at `/metrics`.