Merge commit '0fe5ea6c32ed4c822acd83ffafb3d8537c64f0f0'

This commit is contained in:
Brian Meek 2022-09-17 14:10:59 -07:00
commit 21ec5c4704
3 changed files with 22 additions and 6 deletions

View file

@ -22,6 +22,7 @@ import (
"math" "math"
"net/http" "net/http"
"net/url" "net/url"
"strconv"
"time" "time"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -151,15 +152,25 @@ func (s *OutputRoomEventConsumer) onMessage(
return true return true
} }
txnID := ""
// Try to get the message metadata, if we're able to, use the timestamp as the txnID
if len(msgs) > 0 {
metadata, err := msgs[0].Metadata()
if err == nil {
txnID = strconv.Itoa(int(metadata.Timestamp.UnixNano()))
}
}
// Send event to any relevant application services. If we hit // Send event to any relevant application services. If we hit
// an error here, return false, so that we negatively ack. // an error here, return false, so that we negatively ack.
log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events)) log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
return s.sendEvents(ctx, state, events) == nil return s.sendEvents(txnID, ctx, state, events) == nil
} }
// sendEvents passes events to the appservice by using the transactions // sendEvents passes events to the appservice by using the transactions
// endpoint. It will block for the backoff period if necessary. // endpoint. It will block for the backoff period if necessary.
func (s *OutputRoomEventConsumer) sendEvents( func (s *OutputRoomEventConsumer) sendEvents(
txnID string,
ctx context.Context, state *appserviceState, ctx context.Context, state *appserviceState,
events []*gomatrixserverlib.HeaderedEvent, events []*gomatrixserverlib.HeaderedEvent,
) error { ) error {
@ -173,10 +184,10 @@ func (s *OutputRoomEventConsumer) sendEvents(
return err return err
} }
// If the number of items in the array is different, // If txnID is not defined, generate one from the events.
// then this should be treated as a different transaction. Incorporate the length if txnID == "" {
// of events into the transaction ID. txnID = fmt.Sprintf("%d_%d", events[0].Event.OriginServerTS(), len(transaction))
txnID := fmt.Sprintf("%d_%d", events[0].Event.OriginServerTS(), len(transaction)) }
// Send the transaction to the appservice. // Send the transaction to the appservice.
// https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid // https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid

View file

@ -9,7 +9,7 @@ WORKDIR /build
COPY . /build COPY . /build
RUN mkdir -p bin RUN mkdir -p bin
RUN go build -trimpath -o bin/ -ldflags="-X routing.ReleaseVersion=$RELEASE_VERSION" ./cmd/dendrite-monolith-server RUN go build -trimpath -o bin/ -ldflags="-X 'github.com/matrix-org/dendrite/clientapi/routing.ReleaseVersion=$RELEASE_VERSION'" ./cmd/dendrite-monolith-server
RUN go build -trimpath -o bin/ ./cmd/create-account RUN go build -trimpath -o bin/ ./cmd/create-account
RUN go build -trimpath -o bin/ ./cmd/generate-keys RUN go build -trimpath -o bin/ ./cmd/generate-keys

View file

@ -64,6 +64,11 @@ func Setup(
extRoomsProvider api.ExtraPublicRoomsProvider, extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs, natsClient *nats.Conn, mscCfg *config.MSCs, natsClient *nats.Conn,
) { ) {
logrus.WithFields(logrus.Fields{
"ReleaseVersion": ReleaseVersion,
}).Info("Started clientAPI router with ReleaseVersion")
prometheus.MustRegister(amtRegUsers, sendEventDuration) prometheus.MustRegister(amtRegUsers, sendEventDuration)
rateLimits := httputil.NewRateLimits(&cfg.RateLimiting) rateLimits := httputil.NewRateLimits(&cfg.RateLimiting)