dendrite/test/testrig/jetstream.go
John Terzis 478579eea1 Jterzis/update dendrite (#656)
Pulls in upstream latest changes from [dendrite-fork
](https://github.com/HereNotThere/dendrite)to subtree at
servers/dendrite here.

Co-authored-by: Tak Wai Wong <64229756+tak-hntlabs@users.noreply.github.com>
Co-authored-by: Tak Wai Wong <tak@hntlabs.com>
Co-authored-by: John Terzis <john@hntlabs.com>
2022-10-17 15:59:47 -07:00

35 lines
933 B
Go

package testrig
import (
"encoding/json"
"testing"
"github.com/nats-io/nats.go"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream"
)
func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) {
t.Helper()
for _, msg := range msgs {
if _, err := jsctx.PublishMsg(msg); err != nil {
t.Fatalf("MustPublishMsgs: failed to publish message: %s", err)
}
}
}
func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg {
t.Helper()
msg := nats.NewMsg(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent))
msg.Header.Set(jetstream.RoomEventType, string(update.Type))
msg.Header.Set(jetstream.RoomID, roomID)
var err error
msg.Data, err = json.Marshal(update)
if err != nil {
t.Fatalf("failed to marshal update: %s", err)
}
return msg
}