diff --git a/CHANGES.md b/CHANGES.md index 3e0db8c3f..94edc6288 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,26 @@ # Changelog +## Dendrite 0.5.1 (2021-11-16) + +### Features + +* Experimental (although incomplete) support for joining version 8 and 9 rooms +* State resolution v2 optimisations (close to 20% speed improvement thanks to reduced allocations) +* Optimisations made to the federation `/send` endpoint which avoids duplicate work, reduces CPU usage and smooths out incoming federation +* The sync API now consumes less CPU when generating sync responses (optimised `SelectStateInRange`) +* Support for serving the `.well-known/matrix/server` endpoint from within Dendrite itself (contributed by [twentybit](https://github.com/twentybit)) +* Support for thumbnailing WebP media (contributed by [hacktivista](https://github.com/hacktivista)) + +### Fixes + +* The `/publicRooms` handler now handles `POST` requests in addition to `GET` correctly +* Only valid canonical aliases will be returned in the `/publicRooms` response +* The media API now correctly handles `max_file_size_bytes` being configured to `0` (contributed by [database64128](https://github.com/database64128)) +* Unverifiable auth events in `/send_join` responses no longer result in a panic +* Build issues on Windows are now resolved (contributed by [S7evinK](https://github.com/S7evinK)) +* The default power levels in a room now set the invite level to 50, as per the spec +* A panic has been fixed when malformed messages are received in the key change consumers + ## Dendrite 0.5.0 (2021-08-24) ### Features diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 3cae837c9..4b5f0d660 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -148,7 +148,8 @@ type inputWorker struct { input *sendFIFOQueue } -var inputWorkers sync.Map // room ID -> *inputWorker +var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse +var inputWorkers sync.Map // room ID -> *inputWorker // Send implements /_matrix/federation/v1/send/{txnID} func Send( @@ -164,6 +165,37 @@ func Send( mu *internal.MutexByRoom, servers federationAPI.ServersInRoomProvider, ) util.JSONResponse { + // First we should check if this origin has already submitted this + // txn ID to us. If they have and the txnIDs map contains an entry, + // the transaction is still being worked on. The new client can wait + // for it to complete rather than creating more work. + index := string(request.Origin()) + "\000" + string(txnID) + v, ok := inFlightTxnsPerOrigin.LoadOrStore(index, make(chan util.JSONResponse, 1)) + ch := v.(chan util.JSONResponse) + if ok { + // This origin already submitted this txn ID to us, and the work + // is still taking place, so we'll just wait for it to finish. + ctx, cancel := context.WithTimeout(httpReq.Context(), time.Minute*5) + defer cancel() + select { + case <-ctx.Done(): + // If the caller gives up then return straight away. We don't + // want to attempt to process what they sent us any further. + return util.JSONResponse{Code: http.StatusRequestTimeout} + case res := <-ch: + // The original task just finished processing so let's return + // the result of it. + if res.Code == 0 { + return util.JSONResponse{Code: http.StatusAccepted} + } + return res + } + } + // Otherwise, store that we're currently working on this txn from + // this origin. When we're done processing, close the channel. + defer close(ch) + defer inFlightTxnsPerOrigin.Delete(index) + t := txnReq{ rsAPI: rsAPI, eduAPI: eduAPI, @@ -205,7 +237,7 @@ func Send( util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) - resp, jsonErr := t.processTransaction(httpReq.Context()) + resp, jsonErr := t.processTransaction(context.Background()) if jsonErr != nil { util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") return *jsonErr @@ -215,10 +247,12 @@ func Send( // Status code 200: // The result of processing the transaction. The server is to use this response // even in the event of one or more PDUs failing to be processed. - return util.JSONResponse{ + res := util.JSONResponse{ Code: http.StatusOK, JSON: resp, } + ch <- res + return res } type txnReq struct { diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index a0158dbc2..5420bef3f 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -85,6 +85,11 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error { logrus.WithError(err).Errorf("failed to read device message from key change topic") return nil } + if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil { + // This probably shouldn't happen but stops us from panicking if we come + // across an update that doesn't satisfy either types. + return nil + } switch m.Type { case api.TypeCrossSigningUpdate: return t.onCrossSigningMessage(m) diff --git a/go.mod b/go.mod index 37cedbfba..3991c6b8c 100644 --- a/go.mod +++ b/go.mod @@ -36,8 +36,8 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20211102131912-13366e7985e1 - github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 + github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2 + github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.8 github.com/morikuni/aec v1.0.0 // indirect diff --git a/go.sum b/go.sum index be28e3081..d455c62c7 100644 --- a/go.sum +++ b/go.sum @@ -987,10 +987,10 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20211102131912-13366e7985e1 h1:Pv7+98sreiHltpamJ4em6RCX/WPVN1wl53Gli9Cz744= -github.com/matrix-org/gomatrixserverlib v0.0.0-20211102131912-13366e7985e1/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc= -github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 h1:6JkIymZ1vxfI0shSpg6gNPTJaF4/95Evy34slPVZGKM= -github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2 h1:RFsBN3509Ql6NJ7TDVkcKoN3bb/tmqUqzur5c0AwIHQ= +github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc= +github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d h1:V1b6GZVvL95qTkjYSEWH9Pja6c0WcJKBt2MlAILlw+Q= +github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= diff --git a/internal/version.go b/internal/version.go index cdda60e2e..88123693f 100644 --- a/internal/version.go +++ b/internal/version.go @@ -17,7 +17,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 5 - VersionPatch = 0 + VersionPatch = 1 VersionTag = "" // example: "rc1" ) diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go index 07d80e3fd..4b2bd4a9a 100644 --- a/keyserver/consumers/cross_signing.go +++ b/keyserver/consumers/cross_signing.go @@ -78,6 +78,11 @@ func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMess logrus.WithError(err).Errorf("failed to read device message from key change topic") return nil } + if m.OutputCrossSigningKeyUpdate == nil { + // This probably shouldn't happen but stops us from panicking if we come + // across an update that doesn't satisfy either types. + return nil + } switch m.Type { case api.TypeCrossSigningUpdate: return t.onCrossSigningMessage(m) diff --git a/roomserver/state/state.go b/roomserver/state/state.go index bae8b24c5..78398fc7c 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -778,7 +778,8 @@ func (v *StateResolution) resolveConflictsV2( ctx context.Context, notConflicted, conflicted []types.StateEntry, ) ([]types.StateEntry, error) { - eventIDMap := make(map[string]types.StateEntry) + estimate := len(conflicted) + len(notConflicted) + eventIDMap := make(map[string]types.StateEntry, estimate) // Load the conflicted events conflictedEvents, conflictedEventMap, err := v.loadStateEvents(ctx, conflicted) @@ -800,18 +801,20 @@ func (v *StateResolution) resolveConflictsV2( // For each conflicted event, we will add a new set of auth events. Auth // events may be duplicated across these sets but that's OK. - authSets := make(map[string][]*gomatrixserverlib.Event) - var authEvents []*gomatrixserverlib.Event - var authDifference []*gomatrixserverlib.Event + authSets := make(map[string][]*gomatrixserverlib.Event, len(conflicted)) + authEvents := make([]*gomatrixserverlib.Event, 0, estimate*3) + authDifference := make([]*gomatrixserverlib.Event, 0, estimate) // For each conflicted event, let's try and get the needed auth events. + neededStateKeys := make([]string, 16) + authEntries := make([]types.StateEntry, 16) for _, conflictedEvent := range conflictedEvents { // Work out which auth events we need to load. key := conflictedEvent.EventID() needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{conflictedEvent}) // Find the numeric IDs for the necessary state keys. - var neededStateKeys []string + neededStateKeys = neededStateKeys[:0] neededStateKeys = append(neededStateKeys, needed.Member...) neededStateKeys = append(neededStateKeys, needed.ThirdPartyInvite...) stateKeyNIDMap, err := v.db.EventStateKeyNIDs(ctx, neededStateKeys) @@ -821,7 +824,7 @@ func (v *StateResolution) resolveConflictsV2( // Load the necessary auth events. tuplesNeeded := v.stateKeyTuplesNeeded(stateKeyNIDMap, needed) - var authEntries []types.StateEntry + authEntries = authEntries[:0] for _, tuple := range tuplesNeeded { if eventNID, ok := stateEntryMap(notConflicted).lookup(tuple); ok { authEntries = append(authEntries, types.StateEntry{ diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 1938ff9b0..dfedc6409 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -109,6 +109,11 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er logrus.WithError(err).Errorf("failed to read device message from key change topic") return nil } + if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil { + // This probably shouldn't happen but stops us from panicking if we come + // across an update that doesn't satisfy either types. + return nil + } switch m.Type { case api.TypeCrossSigningUpdate: return s.onCrossSigningMessage(m, msg.Offset, msg.Partition) diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index bd7aa018d..44de02c92 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -116,7 +116,7 @@ const updateEventJSONSQL = "" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " AND ( $3::text[] IS NULL OR sender = ANY($3) )" + @@ -221,13 +221,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( + eventID string streamPos types.StreamPosition eventBytes []byte excludeFromSync bool addIDs pq.StringArray delIDs pq.StringArray ) - if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { return nil, nil, err } // Sanity check for deleted state and whine if we see it. We don't need to do anything @@ -243,7 +244,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(eventBytes, &ev); err != nil { + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, nil, err } needSet := stateNeeded[ev.RoomID()] @@ -258,7 +259,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = types.StreamEvent{ + eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, StreamPosition: streamPos, ExcludeFromSync: excludeFromSync, diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 37f7ea003..afdbe55ce 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -81,7 +81,7 @@ const updateEventJSONSQL = "" + "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" const selectStateInRangeSQL = "" + - "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2)" + " AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))" @@ -173,13 +173,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( + eventID string streamPos types.StreamPosition eventBytes []byte excludeFromSync bool addIDsJSON string delIDsJSON string ) - if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { return nil, nil, err } @@ -201,7 +202,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(eventBytes, &ev); err != nil { + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, nil, err } needSet := stateNeeded[ev.RoomID()] @@ -216,7 +217,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = types.StreamEvent{ + eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, StreamPosition: streamPos, ExcludeFromSync: excludeFromSync, diff --git a/sytest-whitelist b/sytest-whitelist index afe8a7e0d..d074d42b4 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -571,3 +571,19 @@ A prev_batch token from incremental sync can be used in the v1 messages API Inbound federation rejects invites which are not signed by the sender Invited user can reject invite over federation several times Test that we can be reinvited to a room we created +User can create and send/receive messages in a room with version 8 +local user can join room with version 8 +User can invite local user to room with version 8 +remote user can join room with version 8 +User can invite remote user to room with version 8 +Remote user can backfill in a room with version 8 +Can reject invites over federation for rooms with version 8 +Can receive redactions from regular users over federation in room version 8 +User can create and send/receive messages in a room with version 9 +local user can join room with version 9 +User can invite local user to room with version 9 +remote user can join room with version 9 +User can invite remote user to room with version 9 +Remote user can backfill in a room with version 9 +Can reject invites over federation for rooms with version 9 +Can receive redactions from regular users over federation in room version 9