Move event update from client API server to sync API server

This commit is contained in:
Brendan Abolivier 2017-07-21 14:42:58 +01:00
parent e73e5413f1
commit 60c730148a
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
4 changed files with 115 additions and 25 deletions

View file

@ -45,12 +45,6 @@ type displayName struct {
DisplayName string `json:"displayname"`
}
type prevMembership struct {
PrevContent events.MemberContent `json:"prev_content"`
PrevID string `json:"replaces_state"`
UserID string `json:"prev_sender"`
}
// GetProfile implements GET /profile/{userID}
func GetProfile(
req *http.Request, accountDB *accounts.Database, userID string,
@ -246,18 +240,6 @@ func buildMembershipEvents(
evs := []gomatrixserverlib.Event{}
for _, membership := range memberships {
prevContent := events.MemberContent{
Membership: "join",
DisplayName: oldProfile.DisplayName,
AvatarURL: oldProfile.AvatarURL,
}
prev := prevMembership{
UserID: userID,
PrevID: membership.EventID,
PrevContent: prevContent,
}
builder := gomatrixserverlib.EventBuilder{
Sender: userID,
RoomID: membership.RoomID,
@ -265,10 +247,6 @@ func buildMembershipEvents(
StateKey: &userID,
}
if err := builder.SetUnsigned(prev); err != nil {
return nil, err
}
content := events.MemberContent{
Membership: "join",
}

View file

@ -19,6 +19,7 @@ import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/events"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
@ -35,6 +36,15 @@ type OutputRoomEvent struct {
db *storage.SyncServerDatabase
notifier *sync.Notifier
query api.RoomserverQueryAPI
serverName gomatrixserverlib.ServerName
keyID gomatrixserverlib.KeyID
privateKey []byte
}
type prevMembership struct {
PrevContent events.MemberContent `json:"prev_content"`
PrevID string `json:"replaces_state"`
UserID string `json:"prev_sender"`
}
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
@ -55,6 +65,9 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S
db: store,
notifier: n,
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
serverName: cfg.Matrix.ServerName,
keyID: cfg.Matrix.KeyID,
privateKey: cfg.Matrix.PrivateKey,
}
consumer.ProcessMessage = s.onMessage
@ -101,6 +114,18 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: state event lookup failure")
}
ev, err = s.updateMemberEvent(ev, s.keyID, s.privateKey)
if err != nil {
return err
}
for i := range addsStateEvents {
addsStateEvents[i], err = s.updateMemberEvent(addsStateEvents[i], s.keyID, s.privateKey)
if err != nil {
return err
}
}
syncStreamPos, err := s.db.WriteEvent(
&ev, addsStateEvents, output.NewRoomEvent.AddsStateEventIDs, output.NewRoomEvent.RemovesStateEventIDs,
)
@ -177,6 +202,63 @@ func (s *OutputRoomEvent) lookupStateEvents(
return result, nil
}
func (s *OutputRoomEvent) updateMemberEvent(
event gomatrixserverlib.Event, keyID gomatrixserverlib.KeyID,
privateKey []byte,
) (gomatrixserverlib.Event, error) {
if event.Type() != "m.room.member" {
return event, nil
}
membership, err := event.Membership()
if err != nil {
return event, err
}
if membership != "join" {
return event, nil
}
prevEvent, err := s.db.GetMembershipEvent(event.RoomID(), *event.StateKey())
if err != nil {
return event, err
}
if prevEvent == nil {
return event, nil
}
builder := gomatrixserverlib.EventBuilder{
Sender: event.Sender(),
RoomID: event.RoomID(),
Type: event.Type(),
StateKey: event.StateKey(),
PrevEvents: event.PrevEvents(),
AuthEvents: event.AuthEvents(),
Redacts: event.Redacts(),
Depth: event.Depth(),
Content: event.Content(),
}
var content events.MemberContent
if err := json.Unmarshal(prevEvent.Content(), &content); err != nil {
return event, err
}
prev := prevMembership{
PrevContent: content,
PrevID: prevEvent.EventID(),
UserID: prevEvent.Sender(),
}
if err = builder.SetUnsigned(prev); err != nil {
return event, err
}
ts := event.OriginServerTS().Time()
ev, err := builder.Build(event.EventID(), ts, event.Origin(), keyID, privateKey)
return ev, err
}
func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string {
have := map[string]bool{}
for _, event := range events {

View file

@ -66,6 +66,9 @@ const selectCurrentStateSQL = "" +
const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
const selectJoinEventForUserSQL = "" +
"SELECT event_json FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join' AND room_id = $1 AND state_key = $2"
const selectEventsWithEventIDsSQL = "" +
"SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)"
@ -76,6 +79,7 @@ type currentRoomStateStatements struct {
selectCurrentStateStmt *sql.Stmt
selectJoinedUsersStmt *sql.Stmt
selectEventsWithEventIDsStmt *sql.Stmt
selectJoinEventForUserStmt *sql.Stmt
}
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
@ -101,6 +105,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
return
}
if s.selectJoinEventForUserStmt, err = db.Prepare(selectJoinEventForUserSQL); err != nil {
return
}
return
}
@ -195,3 +202,12 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
}
return result, nil
}
func (s *currentRoomStateStatements) selectJoinEventForUser(roomID string, userID string) (*gomatrixserverlib.Event, error) {
var res []byte
if err := s.selectJoinEventForUserStmt.QueryRow(roomID, userID).Scan(&res); err == sql.ErrNoRows {
return nil, nil
}
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false)
return &ev, err
}

View file

@ -100,9 +100,18 @@ func (d *SyncServerDatabase) WriteEvent(
}
streamPos = types.StreamPosition(pos)
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
// Nothing to do, the event may have just been a message event.
return nil
fmt.Println(len(addStateEvents))
if len(addStateEvents) == 0 {
// If the event is a m.room.member event, and has unsigned content,
// we need to save it as it is very likely to be a membership update
// (e.g. if a user updates their profile)
if ev.Type() == "m.room.member" && len(ev.Unsigned()) > 0 {
addStateEvents = append(addStateEvents, *ev)
} else if len(removeStateEventIDs) == 0 {
// Nothing to do, the event may have just been a message event.
return nil
}
}
return d.updateRoomState(txn, removeStateEventIDs, addStateEvents, streamPos)
@ -141,6 +150,11 @@ func (d *SyncServerDatabase) updateRoomState(
return nil
}
// GetMembershipEvent returns the Matrix join event for a given user into a given room
func (d *SyncServerDatabase) GetMembershipEvent(roomID string, userID string) (*gomatrixserverlib.Event, error) {
return d.roomstate.selectJoinEventForUser(roomID, userID)
}
// PartitionOffsets implements common.PartitionStorer
func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
return d.partitions.SelectPartitionOffsets(topic)