Files

96 lines
2.3 KiB
Go

package mqtt
import (
"log/slog"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
"mqqt-scrubber/internal/config"
"mqqt-scrubber/internal/model"
)
type Subscriber struct {
config config.MQTTConfig
handle func(model.RawMessage)
client paho.Client
started bool
}
func NewSubscriber(cfg config.MQTTConfig, handle func(model.RawMessage)) *Subscriber {
return &Subscriber{
config: cfg,
handle: handle,
}
}
func (subscriber *Subscriber) Start() error {
options := paho.NewClientOptions()
options.AddBroker(subscriber.config.Broker)
options.SetClientID(subscriber.config.ClientID)
options.SetAutoReconnect(true)
options.SetConnectRetry(true)
options.SetConnectRetryInterval(5 * time.Second)
options.SetKeepAlive(30 * time.Second)
options.SetPingTimeout(10 * time.Second)
options.SetOrderMatters(false)
if subscriber.config.Username != "" {
options.SetUsername(subscriber.config.Username)
}
if subscriber.config.Password != "" {
options.SetPassword(subscriber.config.Password)
}
callback := func(_ paho.Client, message paho.Message) {
subscriber.handle(model.RawMessage{
Topic: message.Topic(),
Payload: append([]byte(nil), message.Payload()...),
ReceivedAt: time.Now().UTC(),
})
}
options.SetOnConnectHandler(func(client paho.Client) {
filters := make(map[string]byte, len(subscriber.config.Topics))
for _, topic := range subscriber.config.Topics {
filters[topic] = subscriber.config.QoS
}
token := client.SubscribeMultiple(filters, callback)
token.Wait()
if err := token.Error(); err != nil {
slog.Error("failed to subscribe after connect", "error", err)
return
}
slog.Info("subscribed to mqtt topics", "topics", subscriber.config.Topics)
})
options.SetConnectionLostHandler(func(_ paho.Client, err error) {
slog.Warn("mqtt connection lost", "error", err)
})
options.SetReconnectingHandler(func(_ paho.Client, _ *paho.ClientOptions) {
slog.Info("mqtt reconnecting")
})
subscriber.client = paho.NewClient(options)
token := subscriber.client.Connect()
token.Wait()
if err := token.Error(); err != nil {
return err
}
subscriber.started = true
return nil
}
func (subscriber *Subscriber) Stop() {
if !subscriber.started || subscriber.client == nil {
return
}
subscriber.client.Disconnect(250)
subscriber.started = false
}