Merge branch 'master' into add-media-config

This commit is contained in:
Neil Alexander 2021-11-16 15:23:44 +00:00 committed by GitHub
commit 873151f31e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 249 additions and 89 deletions

View file

@ -1,5 +1,26 @@
# Changelog
## Dendrite 0.5.1 (2021-11-16)
### Features
* Experimental (although incomplete) support for joining version 8 and 9 rooms
* State resolution v2 optimisations (close to 20% speed improvement thanks to reduced allocations)
* Optimisations made to the federation `/send` endpoint which avoids duplicate work, reduces CPU usage and smooths out incoming federation
* The sync API now consumes less CPU when generating sync responses (optimised `SelectStateInRange`)
* Support for serving the `.well-known/matrix/server` endpoint from within Dendrite itself (contributed by [twentybit](https://github.com/twentybit))
* Support for thumbnailing WebP media (contributed by [hacktivista](https://github.com/hacktivista))
### Fixes
* The `/publicRooms` handler now handles `POST` requests in addition to `GET` correctly
* Only valid canonical aliases will be returned in the `/publicRooms` response
* The media API now correctly handles `max_file_size_bytes` being configured to `0` (contributed by [database64128](https://github.com/database64128))
* Unverifiable auth events in `/send_join` responses no longer result in a panic
* Build issues on Windows are now resolved (contributed by [S7evinK](https://github.com/S7evinK))
* The default power levels in a room now set the invite level to 50, as per the spec
* A panic has been fixed when malformed messages are received in the key change consumers
## Dendrite 0.5.0 (2021-08-24)
### Features

View file

@ -22,7 +22,6 @@ import (
"io/ioutil"
"os"
"strings"
"syscall"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/config"
@ -121,13 +120,13 @@ func getPassword(password, pwdFile *string, pwdStdin, askPass *bool, r io.Reader
// ask the user to provide the password
if *askPass {
fmt.Print("Enter Password: ")
bytePassword, err := term.ReadPassword(syscall.Stdin)
bytePassword, err := term.ReadPassword(int(os.Stdin.Fd()))
if err != nil {
logrus.Fatalln("Unable to read password:", err)
}
fmt.Println()
fmt.Print("Confirm Password: ")
bytePassword2, err := term.ReadPassword(syscall.Stdin)
bytePassword2, err := term.ReadPassword(int(os.Stdin.Fd()))
if err != nil {
logrus.Fatalln("Unable to read password:", err)
}

View file

@ -148,6 +148,7 @@ type inputWorker struct {
input *sendFIFOQueue
}
var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse
var inputWorkers sync.Map // room ID -> *inputWorker
// Send implements /_matrix/federation/v1/send/{txnID}
@ -164,6 +165,37 @@ func Send(
mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider,
) util.JSONResponse {
// First we should check if this origin has already submitted this
// txn ID to us. If they have and the txnIDs map contains an entry,
// the transaction is still being worked on. The new client can wait
// for it to complete rather than creating more work.
index := string(request.Origin()) + "\000" + string(txnID)
v, ok := inFlightTxnsPerOrigin.LoadOrStore(index, make(chan util.JSONResponse, 1))
ch := v.(chan util.JSONResponse)
if ok {
// This origin already submitted this txn ID to us, and the work
// is still taking place, so we'll just wait for it to finish.
ctx, cancel := context.WithTimeout(httpReq.Context(), time.Minute*5)
defer cancel()
select {
case <-ctx.Done():
// If the caller gives up then return straight away. We don't
// want to attempt to process what they sent us any further.
return util.JSONResponse{Code: http.StatusRequestTimeout}
case res := <-ch:
// The original task just finished processing so let's return
// the result of it.
if res.Code == 0 {
return util.JSONResponse{Code: http.StatusAccepted}
}
return res
}
}
// Otherwise, store that we're currently working on this txn from
// this origin. When we're done processing, close the channel.
defer close(ch)
defer inFlightTxnsPerOrigin.Delete(index)
t := txnReq{
rsAPI: rsAPI,
eduAPI: eduAPI,
@ -205,7 +237,7 @@ func Send(
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
resp, jsonErr := t.processTransaction(httpReq.Context())
resp, jsonErr := t.processTransaction(context.Background())
if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr
@ -215,10 +247,12 @@ func Send(
// Status code 200:
// The result of processing the transaction. The server is to use this response
// even in the event of one or more PDUs failing to be processed.
return util.JSONResponse{
res := util.JSONResponse{
Code: http.StatusOK,
JSON: resp,
}
ch <- res
return res
}
type txnReq struct {

View file

@ -84,6 +84,11 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil
}
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
return nil
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)

4
go.mod
View file

@ -31,9 +31,9 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20211102101113-5e02b64e5312
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89
github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.8
github.com/morikuni/aec v1.0.0 // indirect

9
go.sum
View file

@ -993,12 +993,12 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211102101113-5e02b64e5312 h1:of3kbkOrjE8793Vsni8ofxMSy/vpO1SB+ZkHSfv4wrs=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211102101113-5e02b64e5312/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2 h1:RFsBN3509Ql6NJ7TDVkcKoN3bb/tmqUqzur5c0AwIHQ=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc=
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU=
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 h1:6JkIymZ1vxfI0shSpg6gNPTJaF4/95Evy34slPVZGKM=
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d h1:V1b6GZVvL95qTkjYSEWH9Pja6c0WcJKBt2MlAILlw+Q=
github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
@ -1529,7 +1529,6 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b h1:+qEpEAPhDZ1o0x3tHzZTQDArnOixOzGD9HUJfcg0mb4=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.0.0-20211028202545-6944b10bf410 h1:hTftEOvwiOq2+O8k2D5/Q7COC7k5Qcrgc2TFURJYnvQ=
golang.org/x/image v0.0.0-20211028202545-6944b10bf410/go.mod h1:023OzeP/+EPmXeapQh35lcL3II3LrY8Ic+EFFKVhULM=

View file

@ -18,7 +18,6 @@ import (
"context"
"fmt"
"io"
"log/syslog"
"net/http"
"os"
"path"
@ -31,7 +30,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dugong"
"github.com/sirupsen/logrus"
lSyslog "github.com/sirupsen/logrus/hooks/syslog"
)
type utcFormatter struct {
@ -108,37 +106,6 @@ func SetupStdLogging() {
})
}
// SetupHookLogging configures the logging hooks defined in the configuration.
// If something fails here it means that the logging was improperly configured,
// so we just exit with the error
func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
logrus.SetReportCaller(true)
for _, hook := range hooks {
// Check we received a proper logging level
level, err := logrus.ParseLevel(hook.Level)
if err != nil {
logrus.Fatalf("Unrecognised logging level %s: %q", hook.Level, err)
}
// Perform a first filter on the logs according to the lowest level of all
// (Eg: If we have hook for info and above, prevent logrus from processing debug logs)
if logrus.GetLevel() < level {
logrus.SetLevel(level)
}
switch hook.Type {
case "file":
checkFileHookParams(hook.Params)
setupFileHook(hook, level, componentName)
case "syslog":
checkSyslogHookParams(hook.Params)
setupSyslogHook(hook, level, componentName)
default:
logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type)
}
}
}
// File type hooks should be provided a path to a directory to store log files
func checkFileHookParams(params map[string]interface{}) {
path, ok := params["path"]
@ -178,34 +145,6 @@ func setupFileHook(hook config.LogrusHook, level logrus.Level, componentName str
})
}
func checkSyslogHookParams(params map[string]interface{}) {
addr, ok := params["address"]
if !ok {
logrus.Fatalf("Expecting a parameter \"address\" for logging hook of type \"syslog\"")
}
if _, ok := addr.(string); !ok {
logrus.Fatalf("Parameter \"address\" for logging hook of type \"syslog\" should be a string")
}
proto, ok2 := params["protocol"]
if !ok2 {
logrus.Fatalf("Expecting a parameter \"protocol\" for logging hook of type \"syslog\"")
}
if _, ok2 := proto.(string); !ok2 {
logrus.Fatalf("Parameter \"protocol\" for logging hook of type \"syslog\" should be a string")
}
}
func setupSyslogHook(hook config.LogrusHook, level logrus.Level, componentName string) {
syslogHook, err := lSyslog.NewSyslogHook(hook.Params["protocol"].(string), hook.Params["address"].(string), syslog.LOG_INFO, componentName)
if err == nil {
logrus.AddHook(&logLevelHook{level, syslogHook})
}
}
//CloseAndLogIfError Closes io.Closer and logs the error if any
func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string) {
if closer == nil {

84
internal/log_unix.go Normal file
View file

@ -0,0 +1,84 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows
package internal
import (
"log/syslog"
"github.com/matrix-org/dendrite/setup/config"
"github.com/sirupsen/logrus"
lSyslog "github.com/sirupsen/logrus/hooks/syslog"
)
// SetupHookLogging configures the logging hooks defined in the configuration.
// If something fails here it means that the logging was improperly configured,
// so we just exit with the error
func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
logrus.SetReportCaller(true)
for _, hook := range hooks {
// Check we received a proper logging level
level, err := logrus.ParseLevel(hook.Level)
if err != nil {
logrus.Fatalf("Unrecognised logging level %s: %q", hook.Level, err)
}
// Perform a first filter on the logs according to the lowest level of all
// (Eg: If we have hook for info and above, prevent logrus from processing debug logs)
if logrus.GetLevel() < level {
logrus.SetLevel(level)
}
switch hook.Type {
case "file":
checkFileHookParams(hook.Params)
setupFileHook(hook, level, componentName)
case "syslog":
checkSyslogHookParams(hook.Params)
setupSyslogHook(hook, level, componentName)
default:
logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type)
}
}
}
func checkSyslogHookParams(params map[string]interface{}) {
addr, ok := params["address"]
if !ok {
logrus.Fatalf("Expecting a parameter \"address\" for logging hook of type \"syslog\"")
}
if _, ok := addr.(string); !ok {
logrus.Fatalf("Parameter \"address\" for logging hook of type \"syslog\" should be a string")
}
proto, ok2 := params["protocol"]
if !ok2 {
logrus.Fatalf("Expecting a parameter \"protocol\" for logging hook of type \"syslog\"")
}
if _, ok2 := proto.(string); !ok2 {
logrus.Fatalf("Parameter \"protocol\" for logging hook of type \"syslog\" should be a string")
}
}
func setupSyslogHook(hook config.LogrusHook, level logrus.Level, componentName string) {
syslogHook, err := lSyslog.NewSyslogHook(hook.Params["protocol"].(string), hook.Params["address"].(string), syslog.LOG_INFO, componentName)
if err == nil {
logrus.AddHook(&logLevelHook{level, syslogHook})
}
}

48
internal/log_windows.go Normal file
View file

@ -0,0 +1,48 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/sirupsen/logrus"
)
// SetupHookLogging configures the logging hooks defined in the configuration.
// If something fails here it means that the logging was improperly configured,
// so we just exit with the error
func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
logrus.SetReportCaller(true)
for _, hook := range hooks {
// Check we received a proper logging level
level, err := logrus.ParseLevel(hook.Level)
if err != nil {
logrus.Fatalf("Unrecognised logging level %s: %q", hook.Level, err)
}
// Perform a first filter on the logs according to the lowest level of all
// (Eg: If we have hook for info and above, prevent logrus from processing debug logs)
if logrus.GetLevel() < level {
logrus.SetLevel(level)
}
switch hook.Type {
case "file":
checkFileHookParams(hook.Params)
setupFileHook(hook, level, componentName)
default:
logrus.Fatalf("Unrecognised logging hook type: %s", hook.Type)
}
}
}

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 5
VersionPatch = 0
VersionPatch = 1
VersionTag = "" // example: "rc1"
)

View file

@ -77,6 +77,11 @@ func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMess
logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil
}
if m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
return nil
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)

View file

@ -778,7 +778,8 @@ func (v *StateResolution) resolveConflictsV2(
ctx context.Context,
notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) {
eventIDMap := make(map[string]types.StateEntry)
estimate := len(conflicted) + len(notConflicted)
eventIDMap := make(map[string]types.StateEntry, estimate)
// Load the conflicted events
conflictedEvents, conflictedEventMap, err := v.loadStateEvents(ctx, conflicted)
@ -800,18 +801,20 @@ func (v *StateResolution) resolveConflictsV2(
// For each conflicted event, we will add a new set of auth events. Auth
// events may be duplicated across these sets but that's OK.
authSets := make(map[string][]*gomatrixserverlib.Event)
var authEvents []*gomatrixserverlib.Event
var authDifference []*gomatrixserverlib.Event
authSets := make(map[string][]*gomatrixserverlib.Event, len(conflicted))
authEvents := make([]*gomatrixserverlib.Event, 0, estimate*3)
authDifference := make([]*gomatrixserverlib.Event, 0, estimate)
// For each conflicted event, let's try and get the needed auth events.
neededStateKeys := make([]string, 16)
authEntries := make([]types.StateEntry, 16)
for _, conflictedEvent := range conflictedEvents {
// Work out which auth events we need to load.
key := conflictedEvent.EventID()
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{conflictedEvent})
// Find the numeric IDs for the necessary state keys.
var neededStateKeys []string
neededStateKeys = neededStateKeys[:0]
neededStateKeys = append(neededStateKeys, needed.Member...)
neededStateKeys = append(neededStateKeys, needed.ThirdPartyInvite...)
stateKeyNIDMap, err := v.db.EventStateKeyNIDs(ctx, neededStateKeys)
@ -821,7 +824,7 @@ func (v *StateResolution) resolveConflictsV2(
// Load the necessary auth events.
tuplesNeeded := v.stateKeyTuplesNeeded(stateKeyNIDMap, needed)
var authEntries []types.StateEntry
authEntries = authEntries[:0]
for _, tuple := range tuplesNeeded {
if eventNID, ok := stateEntryMap(notConflicted).lookup(tuple); ok {
authEntries = append(authEntries, types.StateEntry{

View file

@ -109,6 +109,11 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil
}
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
return nil
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return s.onCrossSigningMessage(m, msg.Offset, msg.Partition)

View file

@ -116,7 +116,7 @@ const updateEventJSONSQL = "" +
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" AND ( $3::text[] IS NULL OR sender = ANY($3) )" +
@ -221,13 +221,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
for rows.Next() {
var (
eventID string
streamPos types.StreamPosition
eventBytes []byte
excludeFromSync bool
addIDs pq.StringArray
delIDs pq.StringArray
)
if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
return nil, nil, err
}
// Sanity check for deleted state and whine if we see it. We don't need to do anything
@ -243,7 +244,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
// TODO: Handle redacted events
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil {
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
return nil, nil, err
}
needSet := stateNeeded[ev.RoomID()]
@ -258,7 +259,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
}
stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{
eventIDToEvent[eventID] = types.StreamEvent{
HeaderedEvent: &ev,
StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync,

View file

@ -81,7 +81,7 @@ const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2)" +
" AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))"
@ -173,13 +173,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
for rows.Next() {
var (
eventID string
streamPos types.StreamPosition
eventBytes []byte
excludeFromSync bool
addIDsJSON string
delIDsJSON string
)
if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil {
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil {
return nil, nil, err
}
@ -201,7 +202,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
// TODO: Handle redacted events
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil {
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
return nil, nil, err
}
needSet := stateNeeded[ev.RoomID()]
@ -216,7 +217,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
}
stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{
eventIDToEvent[eventID] = types.StreamEvent{
HeaderedEvent: &ev,
StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync,

View file

@ -572,3 +572,19 @@ A prev_batch token from incremental sync can be used in the v1 messages API
Inbound federation rejects invites which are not signed by the sender
Invited user can reject invite over federation several times
Test that we can be reinvited to a room we created
User can create and send/receive messages in a room with version 8
local user can join room with version 8
User can invite local user to room with version 8
remote user can join room with version 8
User can invite remote user to room with version 8
Remote user can backfill in a room with version 8
Can reject invites over federation for rooms with version 8
Can receive redactions from regular users over federation in room version 8
User can create and send/receive messages in a room with version 9
local user can join room with version 9
User can invite local user to room with version 9
remote user can join room with version 9
User can invite remote user to room with version 9
Remote user can backfill in a room with version 9
Can reject invites over federation for rooms with version 9
Can receive redactions from regular users over federation in room version 9