5.1 KiB
Architecture
Problem statement
The service needs to sit in the background between an MQTT broker and InfluxDB v3. Its main job is to reliably consume selected MQTT topics, normalize mainly Tasmota payloads, and write time-series data into InfluxDB using a schema that is stable and query-friendly.
Design goals
- keep the binary small and operationally simple
- treat MQTT consumption and Influx writes as independent failure domains
- avoid blocking in MQTT callbacks
- keep memory bounded with explicit buffering and batching
- optimize for Tasmota first, not generic ETL
Non-goals for v1
- message replay from broker history
- general-purpose transformation DSL
- UI, dashboards, or query APIs
- support for every possible MQTT device type
High-level flow
- Load config from file and environment.
- Start the batch pipeline.
- Connect to MQTT and subscribe to configured topics.
- Receive raw MQTT messages and enqueue them.
- Parse supported Tasmota payloads into internal records.
- Batch records and flush them to InfluxDB v3 using line protocol.
- On shutdown, stop intake and flush remaining buffered records.
Components
cmd/mqqt-scrubber
Application entrypoint. Handles flags, startup logging, signal handling, and top-level wiring.
internal/config
Loads config from JSON, applies environment overrides, and validates required fields.
internal/mqtt
Owns broker connection management and topic subscriptions. The MQTT callback path should never perform parsing or network writes directly. It only enqueues raw messages into the internal pipeline.
internal/parser
Transforms supported Tasmota JSON payloads into normalized records. The parser is strict about JSON validity but tolerant of unknown keys.
internal/pipeline
Receives raw messages, invokes the parser, batches records, and triggers flushes by size or time.
internal/influx
Serializes records into line protocol and writes them to InfluxDB v3 through /api/v3/write_lp.
Internal record model
Each parsed message becomes one or more records with:
- measurement
- tags
- fields
- timestamp
Current scaffold emits one record per Tasmota message type with flattened field names.
Example:
measurement: tasmota_sensor
tags:
device: kitchen-plug
message_type: sensor
source: tasmota
fields:
energy_power: 42
energy_voltage: 230
si7021_temperature: 21.4
si7021_humidity: 44
timestamp: payload Time or message receive time
Schema guidance
- tags should stay low-cardinality
- device identity belongs in tags
- sensor readings belong in fields
- avoid putting dynamic keys or values in tags
- flatten nested JSON keys into stable underscore-separated field names
Finalized v1 schema
Topic families
tele/<device>/LWTtele/<device>/STATEtele/<device>/SENSOR
Measurements
tasmota_lwttasmota_statetasmota_sensor
Tags
device: sanitized device identifier with hyphens normalized to underscoresmessage_type:lwt,state, orsensorsource: alwaystasmota
Field naming
- nested JSON objects are flattened with underscore separators
- camelCase Tasmota keys are normalized to snake_case
- examples:
UptimeSec->uptime_secTempUnit->temp_unitTotalStartTime->total_start_timeBerry.HeapUsed->berry_heap_used
Current field families
LWT:stateas stringonlineas boolean
- all measurements may also include
device_aliasas an optional string when configured externally STATE:- base fields like
uptime,uptime_sec,heap,sleep,sleep_mode,load_avg,mqtt_count - relay state fields like
power,power1throughpower4 - Wi-Fi fields like
wifi_rssi,wifi_signal,wifi_channel,wifi_link_count,wifi_mode - Berry fields like
berry_heap_used,berry_objects
- base fields like
SENSOR:- energy fields like
energy_total,energy_today,energy_power,energy_voltage,energy_total_start_time - analog fields like
analog_temperatureandanalog_a0 - optional
temp_unit
- energy fields like
Timestamp handling
- prefer payload
Timewhen present - accept RFC3339 and timezone-less Tasmota timestamps in the form
2006-01-02T15:04:05 - current implementation interprets timezone-less timestamps as UTC
Failure handling
MQTT unavailable
- rely on automatic reconnect
- resubscribe in the connect handler
- keep logs explicit about reconnect state
Influx unavailable
- keep records in the in-memory batch until flush attempt returns
- retry on the next scheduled flush
- bound total in-memory intake with a channel capacity
Invalid payload
- log parse failure with topic context
- skip the payload
- continue processing subsequent messages
Initial deployment shape
- one process per broker or environment
- systemd service or Docker container
- config file mounted locally with secrets overridden by environment variables where practical
Immediate next improvements
- add parser fixtures based on real Tasmota payloads
- add counters and metrics
- add integration testing against a local broker and InfluxDB
- evaluate whether a persistent spool is needed for outage tolerance