Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 95 additions & 35 deletions pkg/runner/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package runner

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log/slog"
"net/url"
"sync"

"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/autopaho/queue/file"
Expand Down Expand Up @@ -47,6 +49,12 @@ func (pel pahoErrorLogger) Printf(format string, v ...interface{}) {
pel.logger.Error(fmt.Sprintf(format, v...))
}

type mqttConnectionManager interface {
AwaitConnection(context.Context) error
PublishViaQueue(context.Context, *autopaho.QueuePublish) error
Publish(context.Context, *paho.Publish) (*paho.PublishResponse, error)
}

func (edm *dnstapMinimiser) newAutoPahoClientConfig(caCertPool *x509.CertPool, server string, clientID string, mqttKeepAlive uint16, localFileQueue *file.Queue) (autopaho.ClientConfig, error) {
u, err := url.Parse(server)
if err != nil {
Expand Down Expand Up @@ -88,42 +96,96 @@ func (edm *dnstapMinimiser) newAutoPahoClientConfig(caCertPool *x509.CertPool, s
return cliCfg, nil
}

func (edm *dnstapMinimiser) runAutoPaho(cm *autopaho.ConnectionManager, mqttJWK jwk.Key, usingFileQueue bool) {
defer edm.autopahoWg.Done()

// startMQTTPipeline launches N JWS sign workers and 1 paho publisher. The
// previous design ran sign + publish in a single goroutine, which made
// jws.Sign a serialization bottleneck. Splitting them lets sign work
// parallelize across cores while the paho ConnectionManager's
// single-connection requirement is preserved by the lone publisher.
func (edm *dnstapMinimiser) startMQTTPipeline(cm mqttConnectionManager, mqttJWK jwk.Key, usingFileQueue bool, signWorkers int) {
if signWorkers <= 0 {
signWorkers = 1
}
topic := "events/up/" + mqttJWK.KeyID() + "/new_qname"

edm.log.Info("starting signing MQTT publisher", "jwk_id", mqttJWK.KeyID(), "jwk_alg", mqttJWK.Algorithm(), "topic", topic)
edm.log.Info("starting signing MQTT publisher",
"jwk_id", mqttJWK.KeyID(),
"jwk_alg", mqttJWK.Algorithm(),
"topic", topic,
"sign_workers", signWorkers,
)

// Sign workers: each independently reads unsigned bytes, JWS-signs,
// pushes the signed bytes onto the publisher's queue. When mqttPubCh
// is closed, each worker exits; when all are done, the last one
// closes mqttSignedCh so the publisher knows to drain and exit.
var signWg sync.WaitGroup
signWg.Add(signWorkers)
for i := 0; i < signWorkers; i++ {
go edm.mqttSignWorker(&signWg, mqttJWK)
}

edm.autopahoWg.Add(1)
go func() {
defer edm.autopahoWg.Done()
signWg.Wait()
close(edm.mqttSignedCh)
}()

edm.autopahoWg.Add(1)
go edm.mqttPublishWorker(cm, topic, usingFileQueue)
}

// mqttSignWorker drains mqttPubCh, JWS-signs each message, and forwards to
// mqttSignedCh. Exits when mqttPubCh is closed.
func (edm *dnstapMinimiser) mqttSignWorker(wg *sync.WaitGroup, mqttJWK jwk.Key) {
defer wg.Done()
for unsignedMsg := range edm.mqttPubCh {
signedMsg, err := jws.Sign(unsignedMsg, jws.WithJSON(), jws.WithKey(mqttJWK.Algorithm(), mqttJWK))
if err != nil {
edm.log.Error("mqttSignWorker: failed to create JWS message", "error", err)
continue
}
select {
case edm.mqttSignedCh <- signedMsg:
case <-edm.autopahoCtx.Done():
return
}
}
}

// mqttPublishWorker is the single goroutine that talks to paho. Single-writer
// matches paho's ConnectionManager expectations; signing remains parallel
// upstream while broker back-pressure is contained to this publisher.
func (edm *dnstapMinimiser) mqttPublishWorker(cm mqttConnectionManager, topic string, usingFileQueue bool) {
defer edm.autopahoWg.Done()

var signedMsg []byte
for {
// We only need to wait for a server connection if we have no
// local queue. Otherwise we can just start appending messages
// to disk.
if !usingFileQueue {
// AwaitConnection will return immediately if connection is up; adding this call stops publication whilst
// connection is unavailable.
err := cm.AwaitConnection(edm.autopahoCtx)
if err != nil { // Should only happen when context is cancelled
edm.log.Error("publisher done", "AwaitConnection", err)
return
}
}

// Wait for a message to publish
unsignedMsg := <-edm.mqttPubCh
if unsignedMsg == nil {
// The channel has been closed
edm.log.Info("runAutoPaho: message queue closed, exiting")
var ok bool
select {
case signedMsg, ok = <-edm.mqttSignedCh:
if !ok {
edm.log.Info("mqttPublishWorker: signed queue closed, exiting")
return
}
case <-edm.autopahoCtx.Done():
edm.log.Info("mqttPublishWorker: context cancelled, exiting")
return
}

signedMsg, err := jws.Sign(unsignedMsg, jws.WithJSON(), jws.WithKey(mqttJWK.Algorithm(), mqttJWK))
if err != nil {
edm.log.Error("runAutoPaho: failed to create JWS message", "error", err)
continue
}

if usingFileQueue {
err = cm.PublishViaQueue(edm.autopahoCtx, &autopaho.QueuePublish{
err := cm.PublishViaQueue(edm.autopahoCtx, &autopaho.QueuePublish{
Publish: &paho.Publish{
QoS: 0,
Topic: topic,
Expand All @@ -134,23 +196,21 @@ func (edm *dnstapMinimiser) runAutoPaho(cm *autopaho.ConnectionManager, mqttJWK
edm.log.Error("error writing message to queue", "error", err)
}
} else {
// Publish will block so we run it in a goroutine
go func(msg []byte) {
pr, err := cm.Publish(edm.autopahoCtx, &paho.Publish{
QoS: 0,
Topic: topic,
Payload: msg,
})
if err != nil {
edm.log.Error("error publishing", "error", err)
} else if pr != nil && pr.ReasonCode != 0 && pr.ReasonCode != 16 { // 16 = Server received message but there are no subscribers
// pr is only non-nil for QoS 1 and up
edm.log.Info("reason code received", "reason_code", pr.ReasonCode)
}
if edm.debug {
edm.log.Info("sent message", "content", string(msg))
}
}(signedMsg)
pr, err := cm.Publish(edm.autopahoCtx, &paho.Publish{
QoS: 0,
Topic: topic,
Payload: signedMsg,
})
if err != nil {
edm.log.Error("error publishing", "error", err)
} else if pr != nil && pr.ReasonCode != 0 && pr.ReasonCode != 16 {
// pr is only non-nil for QoS 1 and up;
// 16 = "no subscribers" which is fine.
edm.log.Info("reason code received", "reason_code", pr.ReasonCode)
}
if edm.debug {
edm.log.Info("sent message", "content", string(signedMsg))
}
}

select {
Expand Down
Loading