Make the fix for #2718 the same as dendrite main (#31)

Co-authored-by: Tak Wai Wong <tak@hntlabs.com>
This commit is contained in:
Tak Wai Wong 2022-09-16 16:30:31 -07:00 committed by GitHub
parent d7dc087953
commit 0ef2a3960d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -22,6 +22,7 @@ import (
"math"
"net/http"
"net/url"
"strconv"
"time"
"github.com/matrix-org/gomatrixserverlib"
@ -151,15 +152,25 @@ func (s *OutputRoomEventConsumer) onMessage(
return true
}
txnID := ""
// Try to get the message metadata, if we're able to, use the timestamp as the txnID
if len(msgs) > 0 {
metadata, err := msgs[0].Metadata()
if err == nil {
txnID = strconv.Itoa(int(metadata.Timestamp.UnixNano()))
}
}
// Send event to any relevant application services. If we hit
// an error here, return false, so that we negatively ack.
log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
return s.sendEvents(ctx, state, events) == nil
return s.sendEvents(txnID, ctx, state, events) == nil
}
// sendEvents passes events to the appservice by using the transactions
// endpoint. It will block for the backoff period if necessary.
func (s *OutputRoomEventConsumer) sendEvents(
txnID string,
ctx context.Context, state *appserviceState,
events []*gomatrixserverlib.HeaderedEvent,
) error {
@ -173,10 +184,10 @@ func (s *OutputRoomEventConsumer) sendEvents(
return err
}
// If the number of items in the array is different,
// then this should be treated as a different transaction. Incorporate the length
// of events into the transaction ID.
txnID := fmt.Sprintf("%d_%d", events[0].Event.OriginServerTS(), len(transaction))
// If txnID is not defined, generate one from the events.
if txnID == "" {
txnID = fmt.Sprintf("%d_%d", events[0].Event.OriginServerTS(), len(transaction))
}
// Send the transaction to the appservice.
// https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid