144 lines
4.7 KiB
Markdown
144 lines
4.7 KiB
Markdown
# mqqt-scrubber
|
|
|
|
Small Go daemon for collecting MQTT topics, normalizing mostly Tasmota payloads, and writing them into InfluxDB v3.
|
|
|
|
## Current status
|
|
|
|
This repository now contains:
|
|
|
|
- a milestone work plan in `docs/WORKPLAN.md`
|
|
- a technical design in `docs/ARCHITECTURE.md`
|
|
- an initial runnable scaffold for the background service
|
|
|
|
## Scope for v1
|
|
|
|
- subscribe to MQTT topics, primarily `tele/+/SENSOR` and `tele/+/STATE`
|
|
- parse Tasmota JSON payloads into normalized records
|
|
- batch writes to InfluxDB v3 using the HTTP `write_lp` endpoint
|
|
- run as a long-lived background process with reconnect and graceful shutdown
|
|
|
|
## Project layout
|
|
|
|
```text
|
|
cmd/mqqt-scrubber/ application entrypoint
|
|
internal/config/ config loading and validation
|
|
internal/influx/ InfluxDB v3 HTTP writer
|
|
internal/model/ internal message and record types
|
|
internal/mqtt/ MQTT subscriber wrapper
|
|
internal/parser/ Tasmota payload parsing and flattening
|
|
internal/pipeline/ batching, flushing, and service orchestration
|
|
docs/ work plan and architecture notes
|
|
```
|
|
|
|
## Configuration
|
|
|
|
The service loads configuration from a JSON file passed with `-config` and then applies environment variable overrides.
|
|
|
|
Start with:
|
|
|
|
```bash
|
|
cp config.example.json config.json
|
|
```
|
|
|
|
Supported environment variables:
|
|
|
|
- `MQTT_SCRUBBER_MQTT_BROKER`
|
|
- `MQTT_SCRUBBER_MQTT_USERNAME`
|
|
- `MQTT_SCRUBBER_MQTT_PASSWORD`
|
|
- `MQTT_SCRUBBER_MQTT_CLIENT_ID`
|
|
- `MQTT_SCRUBBER_MQTT_TOPICS`
|
|
- `MQTT_SCRUBBER_MQTT_QOS`
|
|
- `MQTT_SCRUBBER_INFLUX_URL`
|
|
- `MQTT_SCRUBBER_INFLUX_DATABASE`
|
|
- `MQTT_SCRUBBER_INFLUX_TOKEN`
|
|
- `MQTT_SCRUBBER_INFLUX_PRECISION`
|
|
- `MQTT_SCRUBBER_APP_BATCH_SIZE`
|
|
- `MQTT_SCRUBBER_APP_BUFFER_SIZE`
|
|
- `MQTT_SCRUBBER_APP_FLUSH_INTERVAL`
|
|
- `MQTT_SCRUBBER_APP_FLUSH_TIMEOUT`
|
|
- `MQTT_SCRUBBER_APP_LOG_LEVEL`
|
|
- `MQTT_SCRUBBER_APP_HEALTH_ADDRESS`
|
|
|
|
`MQTT_SCRUBBER_MQTT_TOPICS` expects a comma-separated list.
|
|
|
|
## Run
|
|
|
|
```bash
|
|
go run ./cmd/mqqt-scrubber -config config.json
|
|
```
|
|
|
|
The process also exposes simple health endpoints when `health_address` is set:
|
|
|
|
- `/healthz`
|
|
- `/readyz`
|
|
- `/metrics`
|
|
|
|
## Build
|
|
|
|
```bash
|
|
go build ./cmd/mqqt-scrubber
|
|
```
|
|
|
|
## Docker
|
|
|
|
Build the runtime image:
|
|
|
|
```bash
|
|
docker build -t mqqt-scrubber .
|
|
```
|
|
|
|
Run it with your config mounted in:
|
|
|
|
```bash
|
|
docker run --rm \
|
|
-p 8080:8080 \
|
|
-v "$PWD/config.json:/app/config.json:ro" \
|
|
mqqt-scrubber
|
|
```
|
|
|
|
The container health check uses `http://127.0.0.1:8080/healthz`.
|
|
|
|
## Docker Compose
|
|
|
|
Bring the service up with the included compose file:
|
|
|
|
```bash
|
|
docker-compose up --build -d
|
|
```
|
|
|
|
It expects a local `config.json` next to the compose file and exposes port `8080` for health and metrics.
|
|
|
|
## Grafana
|
|
|
|
A simple starter dashboard for the current Tasmota schema is included at `docs/grafana/tasmota-simple-dashboard.json`.
|
|
|
|
Assumptions:
|
|
|
|
- Grafana uses the built-in `InfluxDB` datasource plugin
|
|
- the datasource is configured for InfluxDB v3 with query language set to `SQL`
|
|
- the datasource points at the same database name configured in `MQTT_SCRUBBER_INFLUX_DATABASE`
|
|
|
|
Import the dashboard JSON and bind the `DS_INFLUXDB` input to your InfluxDB datasource.
|
|
|
|
The dashboard includes a `Device` variable for the selected-device section and a `Relay Device` variable that only lists devices publishing relay fields.
|
|
|
|
Use `Device` for the health row, time series, and recent-state table. Use `Relay Device` for the relay status row so non-relay devices do not clutter that picker.
|
|
|
|
The dashboard currently visualizes fields that are known to be emitted by the parser today and that exist with the default topic subscription set:
|
|
|
|
- `tasmota_sensor`: `energy_power`, `energy_voltage`, `energy_total`, `analog_temperature`, `analog_a0`
|
|
- `tasmota_state`: `wifi_signal`, `uptime_sec`
|
|
|
|
If you also want LWT availability panels, add `tele/+/LWT` to the configured MQTT topics so the `tasmota_lwt` measurement is created.
|
|
|
|
Relay panels use the latest `power`, `power1`, `power2`, `power3`, and `power4` values from `tasmota_state`. Devices that do not publish a given relay field will show no value for that panel.
|
|
|
|
The dashboard is split into a fleet summary section and a selected-device section. The selected-device section also includes `Last Seen`, `Seconds Since Last Message`, and `Messages In Range` panels derived from both `tasmota_state` and `tasmota_sensor` timestamps.
|
|
|
|
## Notes
|
|
|
|
- The repo name is kept as `mqqt-scrubber` to match the existing folder.
|
|
- The current parser is intentionally narrow and optimized for Tasmota telemetry first.
|
|
- Writes use line protocol over HTTP against InfluxDB v3 `/api/v3/write_lp`.
|
|
- The Docker image includes a non-root runtime user and a simple HTTP health endpoint.
|
|
- Runtime counters are exposed in Prometheus-style text format at `/metrics`. |