Merge branch 'main' into neilalexander/dlbackoff

This commit is contained in:
Neil Alexander 2022-08-19 10:08:57 +01:00 committed by GitHub
commit 12a868af94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 108 additions and 24 deletions

View file

@ -31,7 +31,7 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Dat
func TestExpireEDUs(t *testing.T) {
var expireEDUTypes = map[string]time.Duration{
gomatrixserverlib.MReceipt: time.Millisecond,
gomatrixserverlib.MReceipt: 0,
}
ctx := context.Background()

View file

@ -5,9 +5,10 @@ import (
"fmt"
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
)
type PerformErrorCode int
@ -161,7 +162,8 @@ func (r *PerformBackfillRequest) PrevEventIDs() []string {
// PerformBackfillResponse is a response to PerformBackfill.
type PerformBackfillResponse struct {
// Missing events, arbritrary order.
Events []*gomatrixserverlib.HeaderedEvent `json:"events"`
Events []*gomatrixserverlib.HeaderedEvent `json:"events"`
HistoryVisibility gomatrixserverlib.HistoryVisibility `json:"history_visibility"`
}
type PerformPublishRequest struct {

View file

@ -164,6 +164,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
// TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.
res.Events = events
res.HistoryVisibility = requester.historyVisiblity
return nil
}
@ -248,6 +249,7 @@ type backfillRequester struct {
servers []gomatrixserverlib.ServerName
eventIDToBeforeStateIDs map[string][]string
eventIDMap map[string]*gomatrixserverlib.Event
historyVisiblity gomatrixserverlib.HistoryVisibility
}
func newBackfillRequester(
@ -266,6 +268,7 @@ func newBackfillRequester(
eventIDMap: make(map[string]*gomatrixserverlib.Event),
bwExtrems: bwExtrems,
preferServer: preferServer,
historyVisiblity: gomatrixserverlib.HistoryVisibilityShared,
}
}
@ -447,7 +450,8 @@ FindSuccessor:
}
// possibly return all joined servers depending on history visiblity
memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.thisServer)
memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.thisServer)
b.historyVisiblity = visibility
if err != nil {
logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules")
return nil
@ -528,7 +532,7 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion,
// pull all events and then filter by that table.
func joinEventsFromHistoryVisibility(
ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry,
thisServer gomatrixserverlib.ServerName) ([]types.Event, error) {
thisServer gomatrixserverlib.ServerName) ([]types.Event, gomatrixserverlib.HistoryVisibility, error) {
var eventNIDs []types.EventNID
for _, entry := range stateEntries {
@ -542,7 +546,9 @@ func joinEventsFromHistoryVisibility(
// Get all of the events in this state
stateEvents, err := db.Events(ctx, eventNIDs)
if err != nil {
return nil, err
// even though the default should be shared, restricting the visibility to joined
// feels more secure here.
return nil, gomatrixserverlib.HistoryVisibilityJoined, err
}
events := make([]*gomatrixserverlib.Event, len(stateEvents))
for i := range stateEvents {
@ -551,20 +557,22 @@ func joinEventsFromHistoryVisibility(
// Can we see events in the room?
canSeeEvents := auth.IsServerAllowed(thisServer, true, events)
visibility := gomatrixserverlib.HistoryVisibility(auth.HistoryVisibilityForRoom(events))
if !canSeeEvents {
logrus.Infof("ServersAtEvent history not visible to us: %s", auth.HistoryVisibilityForRoom(events))
return nil, nil
logrus.Infof("ServersAtEvent history not visible to us: %s", visibility)
return nil, visibility, nil
}
// get joined members
info, err := db.RoomInfo(ctx, roomID)
if err != nil {
return nil, err
return nil, visibility, nil
}
joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false)
if err != nil {
return nil, err
return nil, visibility, err
}
return db.Events(ctx, joinEventNIDs)
evs, err := db.Events(ctx, joinEventNIDs)
return evs, visibility, err
}
func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {

View file

@ -350,8 +350,10 @@ func (r *messagesReq) retrieveEvents() (
startTime := time.Now()
filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, r.rsAPI, events, nil, r.device.UserID, "messages")
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": r.roomID,
"duration": time.Since(startTime),
"room_id": r.roomID,
"events_before": len(events),
"events_after": len(filteredEvents),
}).Debug("applied history visibility (messages)")
return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err
}
@ -513,6 +515,9 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
// Store the events in the database, while marking them as unfit to show
// up in responses to sync requests.
if res.HistoryVisibility == "" {
res.HistoryVisibility = gomatrixserverlib.HistoryVisibilityShared
}
for i := range res.Events {
_, err = r.db.WriteEvent(
context.Background(),
@ -521,7 +526,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
[]string{},
[]string{},
nil, true,
gomatrixserverlib.HistoryVisibilityShared,
res.HistoryVisibility,
)
if err != nil {
return nil, err
@ -534,6 +539,9 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
// last `limit` events
events = events[len(events)-limit:]
}
for _, ev := range events {
ev.Visibility = res.HistoryVisibility
}
return events, nil
}

View file

@ -154,8 +154,12 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
wantJoinedRooms: []string{room.ID},
},
}
// TODO: find a better way
time.Sleep(500 * time.Millisecond)
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
})
for _, tc := range testCases {
w := httptest.NewRecorder()
@ -343,6 +347,13 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// create the users
alice := test.NewUser(t)
aliceDev := userapi.Device{
ID: "ALICEID",
UserID: alice.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "ALICE",
}
bob := test.NewUser(t)
bobDev := userapi.Device{
@ -409,7 +420,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
rsAPI := roomserver.NewInternalAPI(base)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{bobDev}}, rsAPI, &syncKeyAPI{})
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, &syncKeyAPI{})
for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@ -418,12 +429,18 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility))
// send the events/messages to NATS to create the rooms
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)})
beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody})
eventsToSend := append(room.Events(), beforeJoinEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
time.Sleep(100 * time.Millisecond) // TODO: find a better way
syncUntil(t, base, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
return gjson.Get(syncBody, path).Exists()
},
)
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder()
@ -449,14 +466,20 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID))
afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)})
joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID))
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After join in a %s room", tc.historyVisibility)})
afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility)
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody})
eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
time.Sleep(100 * time.Millisecond) // TODO: find a better way
syncUntil(t, base, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
return gjson.Get(syncBody, path).Exists()
},
)
// Verify the messages after/before invite are visible or not
w = httptest.NewRecorder()
@ -511,8 +534,8 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
@ -607,7 +630,14 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
t.Fatalf("unable to send to device message: %v", err)
}
}
time.Sleep((time.Millisecond * 15) * time.Duration(tc.sendMessagesCount)) // wait a bit, so the messages can be processed
syncUntil(t, base, alice.AccessToken,
len(tc.want) == 0,
func(body string) bool {
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
},
)
// Execute a /sync request, recording the response
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
@ -630,6 +660,42 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
}
}
func syncUntil(t *testing.T,
base *base.BaseDendrite, accessToken string,
skip bool,
checkFunc func(syncBody string) bool,
) {
if checkFunc == nil {
t.Fatalf("No checkFunc defined")
}
if skip {
return
}
// loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it
// to the syncAPI when hitting /sync
done := make(chan bool)
defer close(done)
go func() {
for {
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": accessToken,
"timeout": "1000",
})))
if checkFunc(w.Body.String()) {
done <- true
return
}
}
}()
select {
case <-done:
case <-time.After(time.Second * 5):
t.Fatalf("Timed out waiting for messages")
}
}
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input))
for i, ev := range input {