diff --git a/cmd/create-account/main.go b/cmd/create-account/main.go index a9bd92794..f6de2d0d4 100644 --- a/cmd/create-account/main.go +++ b/cmd/create-account/main.go @@ -22,7 +22,6 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/userapi/storage/accounts" - "github.com/matrix-org/dendrite/userapi/storage/devices" "github.com/matrix-org/gomatrixserverlib" ) @@ -39,7 +38,6 @@ var ( username = flag.String("username", "", "The user ID localpart to register e.g 'alice' in '@alice:localhost'.") password = flag.String("password", "", "Optional. The password to register with. If not specified, this account will be password-less.") serverNameStr = flag.String("servername", "localhost", "The Matrix server domain which will form the domain part of the user ID.") - accessToken = flag.String("token", "", "Optional. The desired access_token to have. If not specified, a random access_token will be made.") ) func main() { @@ -78,29 +76,5 @@ func main() { os.Exit(1) } - deviceDB, err := devices.NewDatabase(&config.DatabaseOptions{ - ConnectionString: config.DataSource(*database), - }, serverName) - if err != nil { - fmt.Println(err.Error()) - os.Exit(1) - } - - if *accessToken == "" { - t := "token_" + *username - accessToken = &t - } - - device, err := deviceDB.CreateDevice( - context.Background(), *username, nil, *accessToken, nil, "127.0.0.1", "", - ) - if err != nil { - fmt.Println(err.Error()) - os.Exit(1) - } - - fmt.Println("Created account:") - fmt.Printf("user_id = %s\n", device.UserID) - fmt.Printf("device_id = %s\n", device.ID) - fmt.Printf("access_token = %s\n", device.AccessToken) + fmt.Println("Created account") } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 67031609f..6a5d9d264 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -17,6 +17,7 @@ package input import ( + "bytes" "context" "fmt" @@ -26,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" "github.com/sirupsen/logrus" ) @@ -44,6 +46,28 @@ func (r *Inputer) processRoomEvent( headered := input.Event event := headered.Unwrap() + // if we have already got this event then do not process it again, if the input kind is an outlier. + // Outliers contain no extra information which may warrant a re-processing. + if input.Kind == api.KindOutlier { + evs, err := r.DB.EventsFromIDs(ctx, []string{event.EventID()}) + if err == nil && len(evs) == 1 { + // check hash matches if we're on early room versions where the event ID was a random string + idFormat, err := headered.RoomVersion.EventIDFormat() + if err == nil { + switch idFormat { + case gomatrixserverlib.EventIDFormatV1: + if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { + util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") + return event.EventID(), nil + } + default: + util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") + return event.EventID(), nil + } + } + } + } + // Check that the event passes authentication checks and work out // the numeric IDs for the auth events. isRejected := false diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 5adcd0877..f76b0a0b4 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -233,7 +233,7 @@ func (u *latestEventsUpdater) latestState() error { if err != nil { return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err) } - if len(u.removed) > len(u.added) { + if !u.stateAtEvent.Overwrite && len(u.removed) > len(u.added) { // This really shouldn't happen. // TODO: What is ultimately the best way to handle this situation? logrus.Errorf( diff --git a/roomserver/storage/shared/latest_events_updater.go b/roomserver/storage/shared/latest_events_updater.go index b316f639d..8825dc464 100644 --- a/roomserver/storage/shared/latest_events_updater.go +++ b/roomserver/storage/shared/latest_events_updater.go @@ -70,16 +70,14 @@ func (u *LatestEventsUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID { return u.currentStateSnapshotNID } -// StorePreviousEvents implements types.RoomRecentEventsUpdater +// StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer func (u *LatestEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error { - return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error { - for _, ref := range previousEventReferences { - if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, txn, ref.EventID, ref.EventSHA256, eventNID); err != nil { - return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err) - } + for _, ref := range previousEventReferences { + if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil { + return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err) } - return nil - }) + } + return nil } // IsReferenced implements types.RoomRecentEventsUpdater diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index f2be8b3cf..51dcb8887 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -492,15 +492,32 @@ func (d *Database) StoreEvent( if roomInfo == nil && len(prevEvents) > 0 { return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID()) } + // Create an updater - NB: on sqlite this WILL create a txn as we are directly calling the shared DB form of + // GetLatestEventsForUpdate - not via the SQLiteDatabase form which has `nil` txns. This + // function only does SELECTs though so the created txn (at this point) is just a read txn like + // any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater + // to do writes however then this will need to go inside `Writer.Do`. updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo) if err != nil { return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err) } - if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil { - return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err) + // Ensure that we atomically store prev events AND commit them. If we don't wrap StorePreviousEvents + // and EndTransaction in a writer then it's possible for a new write txn to be made between the two + // function calls which will then fail with 'database is locked'. This new write txn would HAVE to be + // something like SetRoomAlias/RemoveRoomAlias as normal input events are already done sequentially due to + // SupportsConcurrentRoomInputs() == false on sqlite, though this does not apply to setting room aliases + // as they don't go via InputRoomEvents + err = d.Writer.Do(d.DB, updater.txn, func(txn *sql.Tx) error { + if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil { + return fmt.Errorf("updater.StorePreviousEvents: %w", err) + } + succeeded := true + err = sqlutil.EndTransaction(updater, &succeeded) + return err + }) + if err != nil { + return 0, types.StateAtEvent{}, nil, "", err } - succeeded := true - err = sqlutil.EndTransaction(updater, &succeeded) } return roomNID, types.StateAtEvent{