Upstream release v0.9.4

This commit is contained in:
PiotrKozimor 2022-08-22 18:03:50 +02:00 committed by GitHub
commit d15a4e4a61
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 341 additions and 133 deletions

View file

@ -7,6 +7,7 @@ on:
pull_request: pull_request:
release: release:
types: [published] types: [published]
workflow_dispatch:
concurrency: concurrency:
group: ${{ github.workflow }}-${{ github.ref }} group: ${{ github.workflow }}-${{ github.ref }}

View file

@ -1,5 +1,18 @@
# Changelog # Changelog
## Dendrite 0.9.4 (2022-08-19)
### Fixes
* A bug in the roomserver around handling rejected outliers has been fixed
* Backfilled events will now use the correct history visibility where possible
* The device list updater backoff has been fixed, which should reduce the number of outbound HTTP requests and `Failed to query device keys for some users` log entries for dead servers
* The `/sync` endpoint will no longer incorrectly return room entries for retired invites which could cause some rooms to show up in the client "Historical" section
* The `/createRoom` endpoint will now correctly populate `is_direct` in invite membership events, which may help clients to classify direct messages correctly
* The `create-account` tool will now log an error if the shared secret is not set in the Dendrite config
* A couple of minor bugs have been fixed in the membership lazy-loading
* Queued EDUs in the federation API are now cached properly
## Dendrite 0.9.3 (2022-08-15) ## Dendrite 0.9.3 (2022-08-15)
### Important ### Important

View file

@ -49,6 +49,7 @@ type createRoomRequest struct {
GuestCanJoin bool `json:"guest_can_join"` GuestCanJoin bool `json:"guest_can_join"`
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
PowerLevelContentOverride json.RawMessage `json:"power_level_content_override"` PowerLevelContentOverride json.RawMessage `json:"power_level_content_override"`
IsDirect bool `json:"is_direct"`
} }
const ( const (
@ -499,9 +500,17 @@ func createRoom(
// Build some stripped state for the invite. // Build some stripped state for the invite.
var globalStrippedState []gomatrixserverlib.InviteV2StrippedState var globalStrippedState []gomatrixserverlib.InviteV2StrippedState
for _, event := range builtEvents { for _, event := range builtEvents {
// Chosen events from the spec:
// https://spec.matrix.org/v1.3/client-server-api/#stripped-state
switch event.Type() { switch event.Type() {
case gomatrixserverlib.MRoomCreate:
fallthrough
case gomatrixserverlib.MRoomName: case gomatrixserverlib.MRoomName:
fallthrough fallthrough
case gomatrixserverlib.MRoomAvatar:
fallthrough
case gomatrixserverlib.MRoomTopic:
fallthrough
case gomatrixserverlib.MRoomCanonicalAlias: case gomatrixserverlib.MRoomCanonicalAlias:
fallthrough fallthrough
case gomatrixserverlib.MRoomEncryption: case gomatrixserverlib.MRoomEncryption:
@ -522,7 +531,7 @@ func createRoom(
// Build the invite event. // Build the invite event.
inviteEvent, err := buildMembershipEvent( inviteEvent, err := buildMembershipEvent(
ctx, invitee, "", profileAPI, device, gomatrixserverlib.Invite, ctx, invitee, "", profileAPI, device, gomatrixserverlib.Invite,
roomID, true, cfg, evTime, rsAPI, asAPI, roomID, r.IsDirect, cfg, evTime, rsAPI, asAPI,
) )
if err != nil { if err != nil {
util.GetLogger(ctx).WithError(err).Error("buildMembershipEvent failed") util.GetLogger(ctx).WithError(err).Error("buildMembershipEvent failed")

View file

@ -85,6 +85,10 @@ func main() {
logrus.Fatalf("The reset-password flag has been replaced by the POST /_dendrite/admin/resetPassword/{localpart} admin API.") logrus.Fatalf("The reset-password flag has been replaced by the POST /_dendrite/admin/resetPassword/{localpart} admin API.")
} }
if cfg.ClientAPI.RegistrationSharedSecret == "" {
logrus.Fatalln("Shared secret registration is not enabled, enable it by setting a shared secret in the config: 'client_api.registration_shared_secret'")
}
if *username == "" { if *username == "" {
flag.Usage() flag.Usage()
os.Exit(1) os.Exit(1)

View file

@ -110,6 +110,7 @@ func (d *Database) GetPendingEDUs(
return fmt.Errorf("json.Unmarshal: %w", err) return fmt.Errorf("json.Unmarshal: %w", err)
} }
edus[&Receipt{nid}] = &event edus[&Receipt{nid}] = &event
d.Cache.StoreFederationQueuedEDU(nid, &event)
} }
return nil return nil
@ -177,20 +178,18 @@ func (d *Database) GetPendingEDUServerNames(
return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil) return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil)
} }
// DeleteExpiredEDUs deletes expired EDUs // DeleteExpiredEDUs deletes expired EDUs and evicts them from the cache.
func (d *Database) DeleteExpiredEDUs(ctx context.Context) error { func (d *Database) DeleteExpiredEDUs(ctx context.Context) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { var jsonNIDs []int64
err := d.Writer.Do(d.DB, nil, func(txn *sql.Tx) (err error) {
expiredBefore := gomatrixserverlib.AsTimestamp(time.Now()) expiredBefore := gomatrixserverlib.AsTimestamp(time.Now())
jsonNIDs, err := d.FederationQueueEDUs.SelectExpiredEDUs(ctx, txn, expiredBefore) jsonNIDs, err = d.FederationQueueEDUs.SelectExpiredEDUs(ctx, txn, expiredBefore)
if err != nil { if err != nil {
return err return err
} }
if len(jsonNIDs) == 0 { if len(jsonNIDs) == 0 {
return nil return nil
} }
for i := range jsonNIDs {
d.Cache.EvictFederationQueuedEDU(jsonNIDs[i])
}
if err = d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, jsonNIDs); err != nil { if err = d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, jsonNIDs); err != nil {
return err return err
@ -198,4 +197,14 @@ func (d *Database) DeleteExpiredEDUs(ctx context.Context) error {
return d.FederationQueueEDUs.DeleteExpiredEDUs(ctx, txn, expiredBefore) return d.FederationQueueEDUs.DeleteExpiredEDUs(ctx, txn, expiredBefore)
}) })
if err != nil {
return err
}
for i := range jsonNIDs {
d.Cache.EvictFederationQueuedEDU(jsonNIDs[i])
}
return nil
} }

View file

@ -31,7 +31,7 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Dat
func TestExpireEDUs(t *testing.T) { func TestExpireEDUs(t *testing.T) {
var expireEDUTypes = map[string]time.Duration{ var expireEDUTypes = map[string]time.Duration{
gomatrixserverlib.MReceipt: time.Millisecond, gomatrixserverlib.MReceipt: 0,
} }
ctx := context.Background() ctx := context.Background()

View file

@ -17,7 +17,7 @@ var build string
const ( const (
VersionMajor = 0 VersionMajor = 0
VersionMinor = 9 VersionMinor = 9
VersionPatch = 3 VersionPatch = 4
VersionTag = "" // example: "rc1" VersionTag = "" // example: "rc1"
) )

View file

@ -335,8 +335,9 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
retriesMu := &sync.Mutex{} retriesMu := &sync.Mutex{}
// restarter goroutine which will inject failed servers into ch when it is time // restarter goroutine which will inject failed servers into ch when it is time
go func() { go func() {
for {
var serversToRetry []gomatrixserverlib.ServerName var serversToRetry []gomatrixserverlib.ServerName
for {
serversToRetry = serversToRetry[:0] // reuse memory
time.Sleep(time.Second) time.Sleep(time.Second)
retriesMu.Lock() retriesMu.Lock()
now := time.Now() now := time.Now()
@ -355,11 +356,17 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
} }
}() }()
for serverName := range ch { for serverName := range ch {
retriesMu.Lock()
_, exists := retries[serverName]
retriesMu.Unlock()
if exists {
// Don't retry a server that we're already waiting for.
continue
}
waitTime, shouldRetry := u.processServer(serverName) waitTime, shouldRetry := u.processServer(serverName)
if shouldRetry { if shouldRetry {
retriesMu.Lock() retriesMu.Lock()
_, exists := retries[serverName] if _, exists = retries[serverName]; !exists {
if !exists {
retries[serverName] = time.Now().Add(waitTime) retries[serverName] = time.Now().Add(waitTime)
} }
retriesMu.Unlock() retriesMu.Unlock()

View file

@ -5,9 +5,10 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
) )
type PerformErrorCode int type PerformErrorCode int
@ -162,6 +163,7 @@ func (r *PerformBackfillRequest) PrevEventIDs() []string {
type PerformBackfillResponse struct { type PerformBackfillResponse struct {
// Missing events, arbritrary order. // Missing events, arbritrary order.
Events []*gomatrixserverlib.HeaderedEvent `json:"events"` Events []*gomatrixserverlib.HeaderedEvent `json:"events"`
HistoryVisibility gomatrixserverlib.HistoryVisibility `json:"history_visibility"`
} }
type PerformPublishRequest struct { type PerformPublishRequest struct {

View file

@ -39,7 +39,7 @@ func CheckForSoftFail(
var authStateEntries []types.StateEntry var authStateEntries []types.StateEntry
var err error var err error
if rewritesState { if rewritesState {
authStateEntries, err = db.StateEntriesForEventIDs(ctx, stateEventIDs) authStateEntries, err = db.StateEntriesForEventIDs(ctx, stateEventIDs, true)
if err != nil { if err != nil {
return true, fmt.Errorf("StateEntriesForEventIDs failed: %w", err) return true, fmt.Errorf("StateEntriesForEventIDs failed: %w", err)
} }
@ -97,7 +97,7 @@ func CheckAuthEvents(
authEventIDs []string, authEventIDs []string,
) ([]types.EventNID, error) { ) ([]types.EventNID, error) {
// Grab the numeric IDs for the supplied auth state events from the database. // Grab the numeric IDs for the supplied auth state events from the database.
authStateEntries, err := db.StateEntriesForEventIDs(ctx, authEventIDs) authStateEntries, err := db.StateEntriesForEventIDs(ctx, authEventIDs, true)
if err != nil { if err != nil {
return nil, fmt.Errorf("db.StateEntriesForEventIDs: %w", err) return nil, fmt.Errorf("db.StateEntriesForEventIDs: %w", err)
} }

View file

@ -17,8 +17,8 @@
package input package input
import ( import (
"bytes"
"context" "context"
"database/sql"
"fmt" "fmt"
"time" "time"
@ -107,28 +107,6 @@ func (r *Inputer) processRoomEvent(
}) })
} }
// if we have already got this event then do not process it again, if the input kind is an outlier.
// Outliers contain no extra information which may warrant a re-processing.
if input.Kind == api.KindOutlier {
evs, err2 := r.DB.EventsFromIDs(ctx, []string{event.EventID()})
if err2 == nil && len(evs) == 1 {
// check hash matches if we're on early room versions where the event ID was a random string
idFormat, err2 := headered.RoomVersion.EventIDFormat()
if err2 == nil {
switch idFormat {
case gomatrixserverlib.EventIDFormatV1:
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
logger.Debugf("Already processed event; ignoring")
return nil
}
default:
logger.Debugf("Already processed event; ignoring")
return nil
}
}
}
}
// Don't waste time processing the event if the room doesn't exist. // Don't waste time processing the event if the room doesn't exist.
// A room entry locally will only be created in response to a create // A room entry locally will only be created in response to a create
// event. // event.
@ -141,6 +119,29 @@ func (r *Inputer) processRoomEvent(
return fmt.Errorf("room %s does not exist for event %s", event.RoomID(), event.EventID()) return fmt.Errorf("room %s does not exist for event %s", event.RoomID(), event.EventID())
} }
// If we already know about this outlier and it hasn't been rejected
// then we won't attempt to reprocess it. If it was rejected or has now
// arrived as a different kind of event, then we can attempt to reprocess,
// in case we have learned something new or need to weave the event into
// the DAG now.
if input.Kind == api.KindOutlier && roomInfo != nil {
wasRejected, werr := r.DB.IsEventRejected(ctx, roomInfo.RoomNID, event.EventID())
switch {
case werr == sql.ErrNoRows:
// We haven't seen this event before so continue.
case werr != nil:
// Something has gone wrong trying to find out if we rejected
// this event already.
logger.WithError(werr).Errorf("Failed to check if event %q is already seen", event.EventID())
return werr
case !wasRejected:
// We've seen this event before and it wasn't rejected so we
// should ignore it.
logger.Debugf("Already processed event %q, ignoring", event.EventID())
return nil
}
}
var missingAuth, missingPrev bool var missingAuth, missingPrev bool
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{} serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
if !isCreateEvent { if !isCreateEvent {
@ -300,7 +301,7 @@ func (r *Inputer) processRoomEvent(
// bother doing this if the event was already rejected as it just ends up // bother doing this if the event was already rejected as it just ends up
// burning CPU time. // burning CPU time.
historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared. historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared.
if rejectionErr == nil && !isRejected && !softfail { if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected && !softfail {
var err error var err error
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev) historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev)
if err != nil { if err != nil {
@ -355,6 +356,8 @@ func (r *Inputer) processRoomEvent(
// We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it. // We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it.
if isRejected || softfail { if isRejected || softfail {
logger.WithError(rejectionErr).WithFields(logrus.Fields{ logger.WithError(rejectionErr).WithFields(logrus.Fields{
"room_id": event.RoomID(),
"event_id": event.EventID(),
"soft_fail": softfail, "soft_fail": softfail,
"missing_prev": missingPrev, "missing_prev": missingPrev,
}).Warn("Stored rejected event") }).Warn("Stored rejected event")
@ -660,7 +663,7 @@ func (r *Inputer) calculateAndSetState(
// We've been told what the state at the event is so we don't need to calculate it. // We've been told what the state at the event is so we don't need to calculate it.
// Check that those state events are in the database and store the state. // Check that those state events are in the database and store the state.
var entries []types.StateEntry var entries []types.StateEntry
if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil { if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs, true); err != nil {
return fmt.Errorf("updater.StateEntriesForEventIDs: %w", err) return fmt.Errorf("updater.StateEntriesForEventIDs: %w", err)
} }
entries = types.DeduplicateStateEntries(entries) entries = types.DeduplicateStateEntries(entries)

View file

@ -140,11 +140,11 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
continue continue
} }
var entries []types.StateEntry var entries []types.StateEntry
if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil { if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs, true); err != nil {
// attempt to fetch the missing events // attempt to fetch the missing events
r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs) r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs)
// try again // try again
entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs) entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs, true)
if err != nil { if err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event") logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event")
return err return err
@ -164,6 +164,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
// TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point. // TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.
res.Events = events res.Events = events
res.HistoryVisibility = requester.historyVisiblity
return nil return nil
} }
@ -248,6 +249,7 @@ type backfillRequester struct {
servers []gomatrixserverlib.ServerName servers []gomatrixserverlib.ServerName
eventIDToBeforeStateIDs map[string][]string eventIDToBeforeStateIDs map[string][]string
eventIDMap map[string]*gomatrixserverlib.Event eventIDMap map[string]*gomatrixserverlib.Event
historyVisiblity gomatrixserverlib.HistoryVisibility
} }
func newBackfillRequester( func newBackfillRequester(
@ -266,6 +268,7 @@ func newBackfillRequester(
eventIDMap: make(map[string]*gomatrixserverlib.Event), eventIDMap: make(map[string]*gomatrixserverlib.Event),
bwExtrems: bwExtrems, bwExtrems: bwExtrems,
preferServer: preferServer, preferServer: preferServer,
historyVisiblity: gomatrixserverlib.HistoryVisibilityShared,
} }
} }
@ -447,7 +450,8 @@ FindSuccessor:
} }
// possibly return all joined servers depending on history visiblity // possibly return all joined servers depending on history visiblity
memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.thisServer) memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.thisServer)
b.historyVisiblity = visibility
if err != nil { if err != nil {
logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules") logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules")
return nil return nil
@ -528,7 +532,7 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion,
// pull all events and then filter by that table. // pull all events and then filter by that table.
func joinEventsFromHistoryVisibility( func joinEventsFromHistoryVisibility(
ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry, ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry,
thisServer gomatrixserverlib.ServerName) ([]types.Event, error) { thisServer gomatrixserverlib.ServerName) ([]types.Event, gomatrixserverlib.HistoryVisibility, error) {
var eventNIDs []types.EventNID var eventNIDs []types.EventNID
for _, entry := range stateEntries { for _, entry := range stateEntries {
@ -542,7 +546,9 @@ func joinEventsFromHistoryVisibility(
// Get all of the events in this state // Get all of the events in this state
stateEvents, err := db.Events(ctx, eventNIDs) stateEvents, err := db.Events(ctx, eventNIDs)
if err != nil { if err != nil {
return nil, err // even though the default should be shared, restricting the visibility to joined
// feels more secure here.
return nil, gomatrixserverlib.HistoryVisibilityJoined, err
} }
events := make([]*gomatrixserverlib.Event, len(stateEvents)) events := make([]*gomatrixserverlib.Event, len(stateEvents))
for i := range stateEvents { for i := range stateEvents {
@ -551,20 +557,22 @@ func joinEventsFromHistoryVisibility(
// Can we see events in the room? // Can we see events in the room?
canSeeEvents := auth.IsServerAllowed(thisServer, true, events) canSeeEvents := auth.IsServerAllowed(thisServer, true, events)
visibility := gomatrixserverlib.HistoryVisibility(auth.HistoryVisibilityForRoom(events))
if !canSeeEvents { if !canSeeEvents {
logrus.Infof("ServersAtEvent history not visible to us: %s", auth.HistoryVisibilityForRoom(events)) logrus.Infof("ServersAtEvent history not visible to us: %s", visibility)
return nil, nil return nil, visibility, nil
} }
// get joined members // get joined members
info, err := db.RoomInfo(ctx, roomID) info, err := db.RoomInfo(ctx, roomID)
if err != nil { if err != nil {
return nil, err return nil, visibility, nil
} }
joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false) joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false)
if err != nil { if err != nil {
return nil, err return nil, visibility, err
} }
return db.Events(ctx, joinEventNIDs) evs, err := db.Events(ctx, joinEventNIDs)
return evs, visibility, err
} }
func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) { func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {

View file

@ -73,13 +73,10 @@ func (r *Queryer) QueryStateAfterEvents(
prevStates, err := r.DB.StateAtEventIDs(ctx, request.PrevEventIDs) prevStates, err := r.DB.StateAtEventIDs(ctx, request.PrevEventIDs)
if err != nil { if err != nil {
switch err.(type) { if _, ok := err.(types.MissingEventError); ok {
case types.MissingEventError:
util.GetLogger(ctx).Errorf("QueryStateAfterEvents: MissingEventError: %s", err)
return nil return nil
default:
return err
} }
return err
} }
response.PrevEventsExist = true response.PrevEventsExist = true
@ -96,6 +93,12 @@ func (r *Queryer) QueryStateAfterEvents(
) )
} }
if err != nil { if err != nil {
if _, ok := err.(types.MissingEventError); ok {
return nil
}
if _, ok := err.(types.MissingStateError); ok {
return nil
}
return err return err
} }

View file

@ -79,7 +79,7 @@ type Database interface {
// Look up the state entries for a list of string event IDs // Look up the state entries for a list of string event IDs
// Returns an error if the there is an error talking to the database // Returns an error if the there is an error talking to the database
// Returns a types.MissingEventError if the event IDs aren't in the database. // Returns a types.MissingEventError if the event IDs aren't in the database.
StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error) StateEntriesForEventIDs(ctx context.Context, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error)
// Look up the string event state keys for a list of numeric event state keys // Look up the string event state keys for a list of numeric event state keys
// Returns an error if there was a problem talking to the database. // Returns an error if there was a problem talking to the database.
EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error) EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error)
@ -94,6 +94,8 @@ type Database interface {
// Opens and returns a room updater, which locks the room and opens a transaction. // Opens and returns a room updater, which locks the room and opens a transaction.
// The GetRoomUpdater must have Commit or Rollback called on it if this doesn't return an error. // The GetRoomUpdater must have Commit or Rollback called on it if this doesn't return an error.
// If this returns an error then no further action is required. // If this returns an error then no further action is required.
// IsEventRejected returns true if the event is known and rejected.
IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error)
GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error) GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error)
// Look up event references for the latest events in the room and the current state snapshot. // Look up event references for the latest events in the room and the current state snapshot.
// Returns the latest events, the current state and the maximum depth of the latest events plus 1. // Returns the latest events, the current state and the maximum depth of the latest events plus 1.

View file

@ -88,6 +88,14 @@ const bulkSelectStateEventByIDSQL = "" +
" WHERE event_id = ANY($1)" + " WHERE event_id = ANY($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC" " ORDER BY event_type_nid, event_state_key_nid ASC"
// Bulk lookup of events by string ID that aren't excluded.
// Sort by the numeric IDs for event type and state key.
// This means we can use binary search to lookup entries by type and state key.
const bulkSelectStateEventByIDExcludingRejectedSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_id = ANY($1) AND is_rejected = FALSE" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
// Bulk look up of events by event NID, optionally filtering by the event type // Bulk look up of events by event NID, optionally filtering by the event type
// or event state key NIDs if provided. (The CARDINALITY check will return true // or event state key NIDs if provided. (The CARDINALITY check will return true
// if the provided arrays are empty, ergo no filtering). // if the provided arrays are empty, ergo no filtering).
@ -136,10 +144,14 @@ const selectMaxEventDepthSQL = "" +
const selectRoomNIDsForEventNIDsSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid = ANY($1)" "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid = ANY($1)"
const selectEventRejectedSQL = "" +
"SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2"
type eventStatements struct { type eventStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateEventByIDExcludingRejectedStmt *sql.Stmt
bulkSelectStateEventByNIDStmt *sql.Stmt bulkSelectStateEventByNIDStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt bulkSelectStateAtEventByIDStmt *sql.Stmt
updateEventStateStmt *sql.Stmt updateEventStateStmt *sql.Stmt
@ -153,6 +165,7 @@ type eventStatements struct {
bulkSelectUnsentEventNIDStmt *sql.Stmt bulkSelectUnsentEventNIDStmt *sql.Stmt
selectMaxEventDepthStmt *sql.Stmt selectMaxEventDepthStmt *sql.Stmt
selectRoomNIDsForEventNIDsStmt *sql.Stmt selectRoomNIDsForEventNIDsStmt *sql.Stmt
selectEventRejectedStmt *sql.Stmt
} }
func CreateEventsTable(db *sql.DB) error { func CreateEventsTable(db *sql.DB) error {
@ -167,6 +180,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
{&s.insertEventStmt, insertEventSQL}, {&s.insertEventStmt, insertEventSQL},
{&s.selectEventStmt, selectEventSQL}, {&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL}, {&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
{&s.bulkSelectStateEventByIDExcludingRejectedStmt, bulkSelectStateEventByIDExcludingRejectedSQL},
{&s.bulkSelectStateEventByNIDStmt, bulkSelectStateEventByNIDSQL}, {&s.bulkSelectStateEventByNIDStmt, bulkSelectStateEventByNIDSQL},
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL}, {&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
{&s.updateEventStateStmt, updateEventStateSQL}, {&s.updateEventStateStmt, updateEventStateSQL},
@ -180,6 +194,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL}, {&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL},
{&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL}, {&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL},
{&s.selectRoomNIDsForEventNIDsStmt, selectRoomNIDsForEventNIDsSQL}, {&s.selectRoomNIDsForEventNIDsStmt, selectRoomNIDsForEventNIDsSQL},
{&s.selectEventRejectedStmt, selectEventRejectedSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -216,11 +231,18 @@ func (s *eventStatements) SelectEvent(
} }
// bulkSelectStateEventByID lookups a list of state events by event ID. // bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError // If not excluding rejected events, and any of the requested events are missing from
// the database it returns a types.MissingEventError. If excluding rejected events,
// the events will be silently omitted without error.
func (s *eventStatements) BulkSelectStateEventByID( func (s *eventStatements) BulkSelectStateEventByID(
ctx context.Context, txn *sql.Tx, eventIDs []string, ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
stmt := sqlutil.TxStmt(txn, s.bulkSelectStateEventByIDStmt) var stmt *sql.Stmt
if excludeRejected {
stmt = sqlutil.TxStmt(txn, s.bulkSelectStateEventByIDExcludingRejectedStmt)
} else {
stmt = sqlutil.TxStmt(txn, s.bulkSelectStateEventByIDStmt)
}
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
if err != nil { if err != nil {
return nil, err return nil, err
@ -230,10 +252,10 @@ func (s *eventStatements) BulkSelectStateEventByID(
// because of the unique constraint on event IDs. // because of the unique constraint on event IDs.
// So we can allocate an array of the correct size now. // So we can allocate an array of the correct size now.
// We might get fewer results than IDs so we adjust the length of the slice before returning it. // We might get fewer results than IDs so we adjust the length of the slice before returning it.
results := make([]types.StateEntry, len(eventIDs)) results := make([]types.StateEntry, 0, len(eventIDs))
i := 0 i := 0
for ; rows.Next(); i++ { for ; rows.Next(); i++ {
result := &results[i] var result types.StateEntry
if err = rows.Scan( if err = rows.Scan(
&result.EventTypeNID, &result.EventTypeNID,
&result.EventStateKeyNID, &result.EventStateKeyNID,
@ -241,11 +263,12 @@ func (s *eventStatements) BulkSelectStateEventByID(
); err != nil { ); err != nil {
return nil, err return nil, err
} }
results = append(results, result)
} }
if err = rows.Err(); err != nil { if err = rows.Err(); err != nil {
return nil, err return nil, err
} }
if i != len(eventIDs) { if !excludeRejected && i != len(eventIDs) {
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have. // If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
// We don't know which ones were missing because we don't return the string IDs in the query. // We don't know which ones were missing because we don't return the string IDs in the query.
// However it should be possible debug this by replaying queries or entries from the input kafka logs. // However it should be possible debug this by replaying queries or entries from the input kafka logs.
@ -540,3 +563,11 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array {
} }
return nids return nids
} }
func (s *eventStatements) SelectEventRejected(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string,
) (rejected bool, err error) {
stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt)
err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected)
return
}

View file

@ -113,9 +113,9 @@ func (d *Database) eventStateKeyNIDs(
} }
func (d *Database) StateEntriesForEventIDs( func (d *Database) StateEntriesForEventIDs(
ctx context.Context, eventIDs []string, ctx context.Context, eventIDs []string, excludeRejected bool,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
return d.EventsTable.BulkSelectStateEventByID(ctx, nil, eventIDs) return d.EventsTable.BulkSelectStateEventByID(ctx, nil, eventIDs, excludeRejected)
} }
func (d *Database) StateEntriesForTuples( func (d *Database) StateEntriesForTuples(
@ -567,6 +567,10 @@ func (d *Database) GetRoomUpdater(
return updater, err return updater, err
} }
func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (bool, error) {
return d.EventsTable.SelectEventRejected(ctx, nil, roomNID, eventID)
}
func (d *Database) StoreEvent( func (d *Database) StoreEvent(
ctx context.Context, event *gomatrixserverlib.Event, ctx context.Context, event *gomatrixserverlib.Event,
authEventNIDs []types.EventNID, isRejected bool, authEventNIDs []types.EventNID, isRejected bool,

View file

@ -65,6 +65,14 @@ const bulkSelectStateEventByIDSQL = "" +
" WHERE event_id IN ($1)" + " WHERE event_id IN ($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC" " ORDER BY event_type_nid, event_state_key_nid ASC"
// Bulk lookup of events by string ID that aren't rejected.
// Sort by the numeric IDs for event type and state key.
// This means we can use binary search to lookup entries by type and state key.
const bulkSelectStateEventByIDExcludingRejectedSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_id IN ($1) AND is_rejected = 0" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
const bulkSelectStateEventByNIDSQL = "" + const bulkSelectStateEventByNIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" + "SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_nid IN ($1)" " WHERE event_nid IN ($1)"
@ -109,11 +117,15 @@ const selectMaxEventDepthSQL = "" +
const selectRoomNIDsForEventNIDsSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)" "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)"
const selectEventRejectedSQL = "" +
"SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2"
type eventStatements struct { type eventStatements struct {
db *sql.DB db *sql.DB
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateEventByIDExcludingRejectedStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt bulkSelectStateAtEventByIDStmt *sql.Stmt
updateEventStateStmt *sql.Stmt updateEventStateStmt *sql.Stmt
selectEventSentToOutputStmt *sql.Stmt selectEventSentToOutputStmt *sql.Stmt
@ -122,6 +134,7 @@ type eventStatements struct {
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
bulkSelectEventReferenceStmt *sql.Stmt bulkSelectEventReferenceStmt *sql.Stmt
bulkSelectEventIDStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt
selectEventRejectedStmt *sql.Stmt
//bulkSelectEventNIDStmt *sql.Stmt //bulkSelectEventNIDStmt *sql.Stmt
//bulkSelectUnsentEventNIDStmt *sql.Stmt //bulkSelectUnsentEventNIDStmt *sql.Stmt
//selectRoomNIDsForEventNIDsStmt *sql.Stmt //selectRoomNIDsForEventNIDsStmt *sql.Stmt
@ -141,6 +154,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
{&s.insertEventStmt, insertEventSQL}, {&s.insertEventStmt, insertEventSQL},
{&s.selectEventStmt, selectEventSQL}, {&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL}, {&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
{&s.bulkSelectStateEventByIDExcludingRejectedStmt, bulkSelectStateEventByIDExcludingRejectedSQL},
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL}, {&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
{&s.updateEventStateStmt, updateEventStateSQL}, {&s.updateEventStateStmt, updateEventStateSQL},
{&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL}, {&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL},
@ -152,6 +166,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
//{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, //{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL},
//{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL}, //{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL},
//{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL}, //{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL},
{&s.selectEventRejectedStmt, selectEventRejectedSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -189,16 +204,24 @@ func (s *eventStatements) SelectEvent(
} }
// bulkSelectStateEventByID lookups a list of state events by event ID. // bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError // If not excluding rejected events, and any of the requested events are missing from
// the database it returns a types.MissingEventError. If excluding rejected events,
// the events will be silently omitted without error.
func (s *eventStatements) BulkSelectStateEventByID( func (s *eventStatements) BulkSelectStateEventByID(
ctx context.Context, txn *sql.Tx, eventIDs []string, ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
/////////////// ///////////////
var sql string
if excludeRejected {
sql = bulkSelectStateEventByIDExcludingRejectedSQL
} else {
sql = bulkSelectStateEventByIDSQL
}
iEventIDs := make([]interface{}, len(eventIDs)) iEventIDs := make([]interface{}, len(eventIDs))
for k, v := range eventIDs { for k, v := range eventIDs {
iEventIDs[k] = v iEventIDs[k] = v
} }
selectOrig := strings.Replace(bulkSelectStateEventByIDSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1) selectOrig := strings.Replace(sql, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
selectPrep, err := s.db.Prepare(selectOrig) selectPrep, err := s.db.Prepare(selectOrig)
if err != nil { if err != nil {
return nil, err return nil, err
@ -216,10 +239,10 @@ func (s *eventStatements) BulkSelectStateEventByID(
// because of the unique constraint on event IDs. // because of the unique constraint on event IDs.
// So we can allocate an array of the correct size now. // So we can allocate an array of the correct size now.
// We might get fewer results than IDs so we adjust the length of the slice before returning it. // We might get fewer results than IDs so we adjust the length of the slice before returning it.
results := make([]types.StateEntry, len(eventIDs)) results := make([]types.StateEntry, 0, len(eventIDs))
i := 0 i := 0
for ; rows.Next(); i++ { for ; rows.Next(); i++ {
result := &results[i] var result types.StateEntry
if err = rows.Scan( if err = rows.Scan(
&result.EventTypeNID, &result.EventTypeNID,
&result.EventStateKeyNID, &result.EventStateKeyNID,
@ -227,8 +250,9 @@ func (s *eventStatements) BulkSelectStateEventByID(
); err != nil { ); err != nil {
return nil, err return nil, err
} }
results = append(results, result)
} }
if i != len(eventIDs) { if !excludeRejected && i != len(eventIDs) {
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have. // If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
// We don't know which ones were missing because we don't return the string IDs in the query. // We don't know which ones were missing because we don't return the string IDs in the query.
// However it should be possible debug this by replaying queries or entries from the input kafka logs. // However it should be possible debug this by replaying queries or entries from the input kafka logs.
@ -614,3 +638,11 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) string {
b, _ := json.Marshal(eventNIDs) b, _ := json.Marshal(eventNIDs)
return string(b) return string(b)
} }
func (s *eventStatements) SelectEventRejected(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string,
) (rejected bool, err error) {
stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt)
err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected)
return
}

View file

@ -102,7 +102,7 @@ func Test_EventsTable(t *testing.T) {
}) })
} }
stateEvents, err := tab.BulkSelectStateEventByID(ctx, nil, eventIDs) stateEvents, err := tab.BulkSelectStateEventByID(ctx, nil, eventIDs, false)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, len(stateEvents), len(eventIDs)) assert.Equal(t, len(stateEvents), len(eventIDs))
nids := make([]types.EventNID, 0, len(stateEvents)) nids := make([]types.EventNID, 0, len(stateEvents))

View file

@ -46,7 +46,7 @@ type Events interface {
SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error) SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error)
// bulkSelectStateEventByID lookups a list of state events by event ID. // bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError // If any of the requested events are missing from the database it returns a types.MissingEventError
BulkSelectStateEventByID(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StateEntry, error) BulkSelectStateEventByID(ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error)
BulkSelectStateEventByNID(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntry, error) BulkSelectStateEventByNID(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntry, error)
// BulkSelectStateAtEventByID lookups the state at a list of events by event ID. // BulkSelectStateAtEventByID lookups the state at a list of events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError. // If any of the requested events are missing from the database it returns a types.MissingEventError.
@ -66,6 +66,7 @@ type Events interface {
BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error)
SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error)
SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error) SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error)
SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error)
} }
type Rooms interface { type Rooms interface {

View file

@ -352,6 +352,8 @@ func (r *messagesReq) retrieveEvents() (
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime), "duration": time.Since(startTime),
"room_id": r.roomID, "room_id": r.roomID,
"events_before": len(events),
"events_after": len(filteredEvents),
}).Debug("applied history visibility (messages)") }).Debug("applied history visibility (messages)")
return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err
} }
@ -513,6 +515,9 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
// Store the events in the database, while marking them as unfit to show // Store the events in the database, while marking them as unfit to show
// up in responses to sync requests. // up in responses to sync requests.
if res.HistoryVisibility == "" {
res.HistoryVisibility = gomatrixserverlib.HistoryVisibilityShared
}
for i := range res.Events { for i := range res.Events {
_, err = r.db.WriteEvent( _, err = r.db.WriteEvent(
context.Background(), context.Background(),
@ -521,7 +526,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
[]string{}, []string{},
[]string{}, []string{},
nil, true, nil, true,
gomatrixserverlib.HistoryVisibilityShared, res.HistoryVisibility,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -534,6 +539,9 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
// last `limit` events // last `limit` events
events = events[len(events)-limit:] events = events[len(events)-limit:]
} }
for _, ev := range events {
ev.Visibility = res.HistoryVisibility
}
return events, nil return events, nil
} }

View file

@ -279,8 +279,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"since": r.From, "since": r.From,
"current": r.To, "current": r.To,
"adds": addIDs, "adds": len(addIDs),
"dels": delIDs, "dels": len(delIDs),
}).Warn("StateBetween: ignoring deleted state") }).Warn("StateBetween: ignoring deleted state")
} }

View file

@ -234,8 +234,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"since": r.From, "since": r.From,
"current": r.To, "current": r.To,
"adds": addIDsJSON, "adds": len(addIDsJSON),
"dels": delIDsJSON, "dels": len(delIDsJSON),
}).Warn("StateBetween: ignoring deleted state") }).Warn("StateBetween: ignoring deleted state")
} }

View file

@ -7,8 +7,9 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/syncapi/types"
) )
type InviteStreamProvider struct { type InviteStreamProvider struct {
@ -62,6 +63,11 @@ func (p *InviteStreamProvider) IncrementalSync(
req.Response.Rooms.Invite[roomID] = *ir req.Response.Rooms.Invite[roomID] = *ir
} }
// When doing an initial sync, we don't want to add retired invites, as this
// can add rooms we were invited to, but already left.
if from == 0 {
return to
}
for roomID := range retiredInvites { for roomID := range retiredInvites {
if _, ok := req.Response.Rooms.Join[roomID]; !ok { if _, ok := req.Response.Rooms.Join[roomID]; !ok {
lr := types.NewLeaveResponse() lr := types.NewLeaveResponse()

View file

@ -560,14 +560,13 @@ func (p *PDUStreamProvider) lazyLoadMembers(
// If this is a gapped incremental sync, we still want this membership // If this is a gapped incremental sync, we still want this membership
isGappedIncremental := limited && incremental isGappedIncremental := limited && incremental
// We want this users membership event, keep it in the list // We want this users membership event, keep it in the list
_, ok := timelineUsers[event.Sender()] stateKey := *event.StateKey()
wantMembership := ok || isGappedIncremental if _, ok := timelineUsers[stateKey]; ok || isGappedIncremental {
if wantMembership {
newStateEvents = append(newStateEvents, event) newStateEvents = append(newStateEvents, event)
if !includeRedundant { if !includeRedundant {
p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, event.Sender(), event.EventID()) p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, stateKey, event.EventID())
} }
delete(timelineUsers, event.Sender()) delete(timelineUsers, stateKey)
} }
} else { } else {
newStateEvents = append(newStateEvents, event) newStateEvents = append(newStateEvents, event)
@ -578,17 +577,16 @@ func (p *PDUStreamProvider) lazyLoadMembers(
wantUsers = append(wantUsers, userID) wantUsers = append(wantUsers, userID)
} }
// Query missing membership events // Query missing membership events
memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{ filter := gomatrixserverlib.DefaultStateFilter()
Limit: 100, filter.Senders = &wantUsers
Senders: &wantUsers, filter.Types = &[]string{gomatrixserverlib.MRoomMember}
Types: &[]string{gomatrixserverlib.MRoomMember}, memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &filter)
})
if err != nil { if err != nil {
return stateEvents, err return stateEvents, err
} }
// cache the membership events // cache the membership events
for _, membership := range memberships { for _, membership := range memberships {
p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, membership.Sender(), membership.EventID()) p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, *membership.StateKey(), membership.EventID())
} }
stateEvents = append(newStateEvents, memberships...) stateEvents = append(newStateEvents, memberships...)
return stateEvents, nil return stateEvents, nil

View file

@ -10,6 +10,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
@ -21,9 +25,6 @@ import (
"github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/test/testrig"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
) )
type syncRoomserverAPI struct { type syncRoomserverAPI struct {
@ -153,8 +154,12 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
wantJoinedRooms: []string{room.ID}, wantJoinedRooms: []string{room.ID},
}, },
} }
// TODO: find a better way
time.Sleep(500 * time.Millisecond) syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
})
for _, tc := range testCases { for _, tc := range testCases {
w := httptest.NewRecorder() w := httptest.NewRecorder()
@ -342,6 +347,13 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// create the users // create the users
alice := test.NewUser(t) alice := test.NewUser(t)
aliceDev := userapi.Device{
ID: "ALICEID",
UserID: alice.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "ALICE",
}
bob := test.NewUser(t) bob := test.NewUser(t)
bobDev := userapi.Device{ bobDev := userapi.Device{
@ -408,7 +420,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
rsAPI := roomserver.NewInternalAPI(base) rsAPI := roomserver.NewInternalAPI(base)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{bobDev}}, rsAPI, &syncKeyAPI{}) AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, &syncKeyAPI{})
for _, tc := range testCases { for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@ -417,11 +429,18 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility)) room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility))
// send the events/messages to NATS to create the rooms // send the events/messages to NATS to create the rooms
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)}) beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody})
eventsToSend := append(room.Events(), beforeJoinEv) eventsToSend := append(room.Events(), beforeJoinEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err) t.Fatalf("failed to send events: %v", err)
} }
syncUntil(t, base, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
return gjson.Get(syncBody, path).Exists()
},
)
// There is only one event, we expect only to be able to see this, if the room is world_readable // There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder() w := httptest.NewRecorder()
@ -447,13 +466,20 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID)) inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID))
afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)}) afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)})
joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID)) joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID))
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After join in a %s room", tc.historyVisibility)}) afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility)
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody})
eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv) eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err) t.Fatalf("failed to send events: %v", err)
} }
syncUntil(t, base, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
return gjson.Get(syncBody, path).Exists()
},
)
// Verify the messages after/before invite are visible or not // Verify the messages after/before invite are visible or not
w = httptest.NewRecorder() w = httptest.NewRecorder()
@ -508,8 +534,8 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser, AccountType: userapi.AccountTypeUser,
} }
base, close := testrig.CreateBaseDendrite(t, dbType) base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer close() defer baseClose()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
@ -604,7 +630,14 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
t.Fatalf("unable to send to device message: %v", err) t.Fatalf("unable to send to device message: %v", err)
} }
} }
time.Sleep((time.Millisecond * 15) * time.Duration(tc.sendMessagesCount)) // wait a bit, so the messages can be processed
syncUntil(t, base, alice.AccessToken,
len(tc.want) == 0,
func(body string) bool {
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
},
)
// Execute a /sync request, recording the response // Execute a /sync request, recording the response
w := httptest.NewRecorder() w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
@ -627,6 +660,42 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
} }
} }
func syncUntil(t *testing.T,
base *base.BaseDendrite, accessToken string,
skip bool,
checkFunc func(syncBody string) bool,
) {
if checkFunc == nil {
t.Fatalf("No checkFunc defined")
}
if skip {
return
}
// loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it
// to the syncAPI when hitting /sync
done := make(chan bool)
defer close(done)
go func() {
for {
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": accessToken,
"timeout": "1000",
})))
if checkFunc(w.Body.String()) {
done <- true
return
}
}
}()
select {
case <-done:
case <-time.After(time.Second * 5):
t.Fatalf("Timed out waiting for messages")
}
}
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg { func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input)) result := make([]*nats.Msg, len(input))
for i, ev := range input { for i, ev := range input {

View file

@ -144,7 +144,6 @@ Server correctly handles incoming m.device_list_update
If remote user leaves room, changes device and rejoins we see update in sync If remote user leaves room, changes device and rejoins we see update in sync
If remote user leaves room, changes device and rejoins we see update in /keys/changes If remote user leaves room, changes device and rejoins we see update in /keys/changes
If remote user leaves room we no longer receive device updates If remote user leaves room we no longer receive device updates
If a device list update goes missing, the server resyncs on the next one
Server correctly resyncs when client query keys and there is no remote cache Server correctly resyncs when client query keys and there is no remote cache
Server correctly resyncs when server leaves and rejoins a room Server correctly resyncs when server leaves and rejoins a room
Device list doesn't change if remote server is down Device list doesn't change if remote server is down
@ -632,7 +631,6 @@ Test that rejected pushers are removed.
Trying to add push rule with no scope fails with 400 Trying to add push rule with no scope fails with 400
Trying to add push rule with invalid scope fails with 400 Trying to add push rule with invalid scope fails with 400
Forward extremities remain so even after the next events are populated as outliers Forward extremities remain so even after the next events are populated as outliers
If a device list update goes missing, the server resyncs on the next one
uploading self-signing key notifies over federation uploading self-signing key notifies over federation
uploading signed devices gets propagated over federation uploading signed devices gets propagated over federation
Device list doesn't change if remote server is down Device list doesn't change if remote server is down