Merge branch 'main' into tak/bug-#2718-appservice-txnid

This commit is contained in:
Tak Wai Wong 2022-09-16 15:07:50 -07:00
commit d25b84a7f3
4 changed files with 67 additions and 15 deletions

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"strings" "strings"
"time"
"github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -81,11 +82,14 @@ func runTests(baseURL, branchName string) error {
client: users[1].client, text: "4: " + branchName, client: users[1].client, text: "4: " + branchName,
}, },
} }
wantEventIDs := make(map[string]struct{}, 8)
for _, msg := range msgs { for _, msg := range msgs {
_, err = msg.client.SendText(dmRoomID, msg.text) var resp *gomatrix.RespSendEvent
resp, err = msg.client.SendText(dmRoomID, msg.text)
if err != nil { if err != nil {
return fmt.Errorf("failed to send text in dm room: %s", err) return fmt.Errorf("failed to send text in dm room: %s", err)
} }
wantEventIDs[resp.EventID] = struct{}{}
} }
// attempt to create/join the shared public room // attempt to create/join the shared public room
@ -113,11 +117,48 @@ func runTests(baseURL, branchName string) error {
} }
// send messages // send messages
for _, msg := range msgs { for _, msg := range msgs {
_, err = msg.client.SendText(publicRoomID, "public "+msg.text) resp, err := msg.client.SendText(publicRoomID, "public "+msg.text)
if err != nil { if err != nil {
return fmt.Errorf("failed to send text in public room: %s", err) return fmt.Errorf("failed to send text in public room: %s", err)
} }
wantEventIDs[resp.EventID] = struct{}{}
} }
// Sync until we have all expected messages
doneCh := make(chan struct{})
go func() {
syncClient := users[0].client
since := ""
for len(wantEventIDs) > 0 {
select {
case <-doneCh:
return
default:
}
syncResp, err := syncClient.SyncRequest(1000, since, "1", false, "")
if err != nil {
continue
}
for _, room := range syncResp.Rooms.Join {
for _, ev := range room.Timeline.Events {
if ev.Type != "m.room.message" {
continue
}
delete(wantEventIDs, ev.ID)
}
}
since = syncResp.NextBatch
}
close(doneCh)
}()
select {
case <-time.After(time.Second * 10):
close(doneCh)
return fmt.Errorf("failed to receive all expected messages: %+v", wantEventIDs)
case <-doneCh:
}
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID) log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
return nil return nil
} }

View file

@ -163,10 +163,10 @@ func (p *SyncAPIProducer) SendPresence(
} }
func (p *SyncAPIProducer) SendDeviceListUpdate( func (p *SyncAPIProducer) SendDeviceListUpdate(
ctx context.Context, deviceListUpdate gomatrixserverlib.RawJSON, origin string, ctx context.Context, deviceListUpdate gomatrixserverlib.RawJSON, origin gomatrixserverlib.ServerName,
) (err error) { ) (err error) {
m := nats.NewMsg(p.TopicDeviceListUpdate) m := nats.NewMsg(p.TopicDeviceListUpdate)
m.Header.Set("origin", origin) m.Header.Set("origin", string(origin))
m.Data = deviceListUpdate m.Data = deviceListUpdate
log.Debugf("Sending device list update: %+v", m.Header) log.Debugf("Sending device list update: %+v", m.Header)
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx)) _, err = p.JetStream.PublishMsg(m, nats.Context(ctx))

View file

@ -359,7 +359,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
} }
} }
case gomatrixserverlib.MDeviceListUpdate: case gomatrixserverlib.MDeviceListUpdate:
if err := t.producer.SendDeviceListUpdate(ctx, e.Content, e.Origin); err != nil { if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil {
util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate") util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
} }
case gomatrixserverlib.MReceipt: case gomatrixserverlib.MReceipt:

View file

@ -264,16 +264,27 @@ func (u *latestEventsUpdater) latestState() error {
return fmt.Errorf("roomState.CalculateAndStoreStateAfterEvents: %w", err) return fmt.Errorf("roomState.CalculateAndStoreStateAfterEvents: %w", err)
} }
// Now that we have a new state snapshot based on the latest events, // Include information about what changed in the state transition. If the
// we can compare that new snapshot to the previous one and see what // event rewrites the state (i.e. is a federated join) then we will simply
// has changed. This gives us one list of removed state events and // include the entire state snapshot as added events, as the "RewritesState"
// another list of added ones. Replacing a value for a state-key tuple // flag in the output event signals downstream components to purge their
// will result one removed (the old event) and one added (the new event). // room state first. If it doesn't rewrite the state then we will work out
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots( // what the difference is between the state snapshots and send that. In all
ctx, u.oldStateNID, u.newStateNID, // cases where a state event is being replaced, the old state event will
) // appear in "removed" and the replacement will appear in "added".
if err != nil { if u.rewritesState {
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err) u.removed = []types.StateEntry{}
u.added, err = roomState.LoadStateAtSnapshot(ctx, u.newStateNID)
if err != nil {
return fmt.Errorf("roomState.LoadStateAtSnapshot: %w", err)
}
} else {
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
ctx, u.oldStateNID, u.newStateNID,
)
if err != nil {
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
}
} }
if removed := len(u.removed) - len(u.added); !u.rewritesState && removed > 0 { if removed := len(u.removed) - len(u.added); !u.rewritesState && removed > 0 {