Files
2026-03-14 22:42:27 +01:00

5.1 KiB

Architecture

Problem statement

The service needs to sit in the background between an MQTT broker and InfluxDB v3. Its main job is to reliably consume selected MQTT topics, normalize mainly Tasmota payloads, and write time-series data into InfluxDB using a schema that is stable and query-friendly.

Design goals

  • keep the binary small and operationally simple
  • treat MQTT consumption and Influx writes as independent failure domains
  • avoid blocking in MQTT callbacks
  • keep memory bounded with explicit buffering and batching
  • optimize for Tasmota first, not generic ETL

Non-goals for v1

  • message replay from broker history
  • general-purpose transformation DSL
  • UI, dashboards, or query APIs
  • support for every possible MQTT device type

High-level flow

  1. Load config from file and environment.
  2. Start the batch pipeline.
  3. Connect to MQTT and subscribe to configured topics.
  4. Receive raw MQTT messages and enqueue them.
  5. Parse supported Tasmota payloads into internal records.
  6. Batch records and flush them to InfluxDB v3 using line protocol.
  7. On shutdown, stop intake and flush remaining buffered records.

Components

cmd/mqqt-scrubber

Application entrypoint. Handles flags, startup logging, signal handling, and top-level wiring.

internal/config

Loads config from JSON, applies environment overrides, and validates required fields.

internal/mqtt

Owns broker connection management and topic subscriptions. The MQTT callback path should never perform parsing or network writes directly. It only enqueues raw messages into the internal pipeline.

internal/parser

Transforms supported Tasmota JSON payloads into normalized records. The parser is strict about JSON validity but tolerant of unknown keys.

internal/pipeline

Receives raw messages, invokes the parser, batches records, and triggers flushes by size or time.

internal/influx

Serializes records into line protocol and writes them to InfluxDB v3 through /api/v3/write_lp.

Internal record model

Each parsed message becomes one or more records with:

  • measurement
  • tags
  • fields
  • timestamp

Current scaffold emits one record per Tasmota message type with flattened field names.

Example:

measurement: tasmota_sensor
tags:
  device: kitchen-plug
  message_type: sensor
  source: tasmota
fields:
  energy_power: 42
  energy_voltage: 230
  si7021_temperature: 21.4
  si7021_humidity: 44
timestamp: payload Time or message receive time

Schema guidance

  • tags should stay low-cardinality
  • device identity belongs in tags
  • sensor readings belong in fields
  • avoid putting dynamic keys or values in tags
  • flatten nested JSON keys into stable underscore-separated field names

Finalized v1 schema

Topic families

  • tele/<device>/LWT
  • tele/<device>/STATE
  • tele/<device>/SENSOR

Measurements

  • tasmota_lwt
  • tasmota_state
  • tasmota_sensor

Tags

  • device: sanitized device identifier with hyphens normalized to underscores
  • message_type: lwt, state, or sensor
  • source: always tasmota

Field naming

  • nested JSON objects are flattened with underscore separators
  • camelCase Tasmota keys are normalized to snake_case
  • examples:
    • UptimeSec -> uptime_sec
    • TempUnit -> temp_unit
    • TotalStartTime -> total_start_time
    • Berry.HeapUsed -> berry_heap_used

Current field families

  • LWT:
    • state as string
    • online as boolean
  • all measurements may also include device_alias as an optional string when configured externally
  • STATE:
    • base fields like uptime, uptime_sec, heap, sleep, sleep_mode, load_avg, mqtt_count
    • relay state fields like power, power1 through power4
    • Wi-Fi fields like wifi_rssi, wifi_signal, wifi_channel, wifi_link_count, wifi_mode
    • Berry fields like berry_heap_used, berry_objects
  • SENSOR:
    • energy fields like energy_total, energy_today, energy_power, energy_voltage, energy_total_start_time
    • analog fields like analog_temperature and analog_a0
    • optional temp_unit

Timestamp handling

  • prefer payload Time when present
  • accept RFC3339 and timezone-less Tasmota timestamps in the form 2006-01-02T15:04:05
  • current implementation interprets timezone-less timestamps as UTC

Failure handling

MQTT unavailable

  • rely on automatic reconnect
  • resubscribe in the connect handler
  • keep logs explicit about reconnect state

Influx unavailable

  • keep records in the in-memory batch until flush attempt returns
  • retry on the next scheduled flush
  • bound total in-memory intake with a channel capacity

Invalid payload

  • log parse failure with topic context
  • skip the payload
  • continue processing subsequent messages

Initial deployment shape

  • one process per broker or environment
  • systemd service or Docker container
  • config file mounted locally with secrets overridden by environment variables where practical

Immediate next improvements

  • add parser fixtures based on real Tasmota payloads
  • add counters and metrics
  • add integration testing against a local broker and InfluxDB
  • evaluate whether a persistent spool is needed for outage tolerance