96 lines
2.3 KiB
Go
96 lines
2.3 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"log/slog"
|
|
"time"
|
|
|
|
paho "github.com/eclipse/paho.mqtt.golang"
|
|
|
|
"mqqt-scrubber/internal/config"
|
|
"mqqt-scrubber/internal/model"
|
|
)
|
|
|
|
type Subscriber struct {
|
|
config config.MQTTConfig
|
|
handle func(model.RawMessage)
|
|
client paho.Client
|
|
started bool
|
|
}
|
|
|
|
func NewSubscriber(cfg config.MQTTConfig, handle func(model.RawMessage)) *Subscriber {
|
|
return &Subscriber{
|
|
config: cfg,
|
|
handle: handle,
|
|
}
|
|
}
|
|
|
|
func (subscriber *Subscriber) Start() error {
|
|
options := paho.NewClientOptions()
|
|
options.AddBroker(subscriber.config.Broker)
|
|
options.SetClientID(subscriber.config.ClientID)
|
|
options.SetAutoReconnect(true)
|
|
options.SetConnectRetry(true)
|
|
options.SetConnectRetryInterval(5 * time.Second)
|
|
options.SetKeepAlive(30 * time.Second)
|
|
options.SetPingTimeout(10 * time.Second)
|
|
options.SetOrderMatters(false)
|
|
|
|
if subscriber.config.Username != "" {
|
|
options.SetUsername(subscriber.config.Username)
|
|
}
|
|
if subscriber.config.Password != "" {
|
|
options.SetPassword(subscriber.config.Password)
|
|
}
|
|
|
|
callback := func(_ paho.Client, message paho.Message) {
|
|
subscriber.handle(model.RawMessage{
|
|
Topic: message.Topic(),
|
|
Payload: append([]byte(nil), message.Payload()...),
|
|
ReceivedAt: time.Now().UTC(),
|
|
})
|
|
}
|
|
|
|
options.SetOnConnectHandler(func(client paho.Client) {
|
|
filters := make(map[string]byte, len(subscriber.config.Topics))
|
|
for _, topic := range subscriber.config.Topics {
|
|
filters[topic] = subscriber.config.QoS
|
|
}
|
|
|
|
token := client.SubscribeMultiple(filters, callback)
|
|
token.Wait()
|
|
if err := token.Error(); err != nil {
|
|
slog.Error("failed to subscribe after connect", "error", err)
|
|
return
|
|
}
|
|
|
|
slog.Info("subscribed to mqtt topics", "topics", subscriber.config.Topics)
|
|
})
|
|
|
|
options.SetConnectionLostHandler(func(_ paho.Client, err error) {
|
|
slog.Warn("mqtt connection lost", "error", err)
|
|
})
|
|
|
|
options.SetReconnectingHandler(func(_ paho.Client, _ *paho.ClientOptions) {
|
|
slog.Info("mqtt reconnecting")
|
|
})
|
|
|
|
subscriber.client = paho.NewClient(options)
|
|
token := subscriber.client.Connect()
|
|
token.Wait()
|
|
if err := token.Error(); err != nil {
|
|
return err
|
|
}
|
|
|
|
subscriber.started = true
|
|
return nil
|
|
}
|
|
|
|
func (subscriber *Subscriber) Stop() {
|
|
if !subscriber.started || subscriber.client == nil {
|
|
return
|
|
}
|
|
|
|
subscriber.client.Disconnect(250)
|
|
subscriber.started = false
|
|
}
|