diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go index 62e4c3035..44801a83c 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go @@ -45,12 +45,6 @@ type displayName struct { DisplayName string `json:"displayname"` } -type prevMembership struct { - PrevContent events.MemberContent `json:"prev_content"` - PrevID string `json:"replaces_state"` - UserID string `json:"prev_sender"` -} - // GetProfile implements GET /profile/{userID} func GetProfile( req *http.Request, accountDB *accounts.Database, userID string, @@ -246,18 +240,6 @@ func buildMembershipEvents( evs := []gomatrixserverlib.Event{} for _, membership := range memberships { - prevContent := events.MemberContent{ - Membership: "join", - DisplayName: oldProfile.DisplayName, - AvatarURL: oldProfile.AvatarURL, - } - - prev := prevMembership{ - UserID: userID, - PrevID: membership.EventID, - PrevContent: prevContent, - } - builder := gomatrixserverlib.EventBuilder{ Sender: userID, RoomID: membership.RoomID, @@ -265,10 +247,6 @@ func buildMembershipEvents( StateKey: &userID, } - if err := builder.SetUnsigned(prev); err != nil { - return nil, err - } - content := events.MemberContent{ Membership: "join", } diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 70f42e1b7..8a62889d7 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -19,6 +19,7 @@ import ( "fmt" log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -35,6 +36,15 @@ type OutputRoomEvent struct { db *storage.SyncServerDatabase notifier *sync.Notifier query api.RoomserverQueryAPI + serverName gomatrixserverlib.ServerName + keyID gomatrixserverlib.KeyID + privateKey []byte +} + +type prevMembership struct { + PrevContent events.MemberContent `json:"prev_content"` + PrevID string `json:"replaces_state"` + UserID string `json:"prev_sender"` } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. @@ -55,6 +65,9 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S db: store, notifier: n, query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + serverName: cfg.Matrix.ServerName, + keyID: cfg.Matrix.KeyID, + privateKey: cfg.Matrix.PrivateKey, } consumer.ProcessMessage = s.onMessage @@ -101,6 +114,18 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: state event lookup failure") } + ev, err = s.updateMemberEvent(ev, s.keyID, s.privateKey) + if err != nil { + return err + } + + for i := range addsStateEvents { + addsStateEvents[i], err = s.updateMemberEvent(addsStateEvents[i], s.keyID, s.privateKey) + if err != nil { + return err + } + } + syncStreamPos, err := s.db.WriteEvent( &ev, addsStateEvents, output.NewRoomEvent.AddsStateEventIDs, output.NewRoomEvent.RemovesStateEventIDs, ) @@ -177,6 +202,63 @@ func (s *OutputRoomEvent) lookupStateEvents( return result, nil } +func (s *OutputRoomEvent) updateMemberEvent( + event gomatrixserverlib.Event, keyID gomatrixserverlib.KeyID, + privateKey []byte, +) (gomatrixserverlib.Event, error) { + if event.Type() != "m.room.member" { + return event, nil + } + membership, err := event.Membership() + if err != nil { + return event, err + } + if membership != "join" { + return event, nil + } + + prevEvent, err := s.db.GetMembershipEvent(event.RoomID(), *event.StateKey()) + if err != nil { + return event, err + } + + if prevEvent == nil { + return event, nil + } + + builder := gomatrixserverlib.EventBuilder{ + Sender: event.Sender(), + RoomID: event.RoomID(), + Type: event.Type(), + StateKey: event.StateKey(), + PrevEvents: event.PrevEvents(), + AuthEvents: event.AuthEvents(), + Redacts: event.Redacts(), + Depth: event.Depth(), + Content: event.Content(), + } + + var content events.MemberContent + if err := json.Unmarshal(prevEvent.Content(), &content); err != nil { + return event, err + } + + prev := prevMembership{ + PrevContent: content, + PrevID: prevEvent.EventID(), + UserID: prevEvent.Sender(), + } + + if err = builder.SetUnsigned(prev); err != nil { + return event, err + } + + ts := event.OriginServerTS().Time() + ev, err := builder.Build(event.EventID(), ts, event.Origin(), keyID, privateKey) + + return ev, err +} + func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string { have := map[string]bool{} for _, event := range events { diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index d4f260e00..4ebaa5be3 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -66,6 +66,9 @@ const selectCurrentStateSQL = "" + const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'" +const selectJoinEventForUserSQL = "" + + "SELECT event_json FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join' AND room_id = $1 AND state_key = $2" + const selectEventsWithEventIDsSQL = "" + "SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)" @@ -76,6 +79,7 @@ type currentRoomStateStatements struct { selectCurrentStateStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt selectEventsWithEventIDsStmt *sql.Stmt + selectJoinEventForUserStmt *sql.Stmt } func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { @@ -101,6 +105,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { return } + if s.selectJoinEventForUserStmt, err = db.Prepare(selectJoinEventForUserSQL); err != nil { + return + } return } @@ -195,3 +202,12 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { } return result, nil } + +func (s *currentRoomStateStatements) selectJoinEventForUser(roomID string, userID string) (*gomatrixserverlib.Event, error) { + var res []byte + if err := s.selectJoinEventForUserStmt.QueryRow(roomID, userID).Scan(&res); err == sql.ErrNoRows { + return nil, nil + } + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false) + return &ev, err +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 27afd1c05..b7a3eea64 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -100,9 +100,18 @@ func (d *SyncServerDatabase) WriteEvent( } streamPos = types.StreamPosition(pos) - if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { - // Nothing to do, the event may have just been a message event. - return nil + fmt.Println(len(addStateEvents)) + + if len(addStateEvents) == 0 { + // If the event is a m.room.member event, and has unsigned content, + // we need to save it as it is very likely to be a membership update + // (e.g. if a user updates their profile) + if ev.Type() == "m.room.member" && len(ev.Unsigned()) > 0 { + addStateEvents = append(addStateEvents, *ev) + } else if len(removeStateEventIDs) == 0 { + // Nothing to do, the event may have just been a message event. + return nil + } } return d.updateRoomState(txn, removeStateEventIDs, addStateEvents, streamPos) @@ -141,6 +150,11 @@ func (d *SyncServerDatabase) updateRoomState( return nil } +// GetMembershipEvent returns the Matrix join event for a given user into a given room +func (d *SyncServerDatabase) GetMembershipEvent(roomID string, userID string) (*gomatrixserverlib.Event, error) { + return d.roomstate.selectJoinEventForUser(roomID, userID) +} + // PartitionOffsets implements common.PartitionStorer func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { return d.partitions.SelectPartitionOffsets(topic)