From f0c2ccb37a4b78be2db82981480da6d8cd43cb64 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 4 Jul 2018 14:51:20 +0100 Subject: [PATCH] Use gmsl.Event instead of AS-only event in transactions Also clear up the logic on lookupStateEvents a little bit. --- .../appservice/consumers/roomserver.go | 57 ++++++------- .../storage/appservice_events_table.go | 81 ++++++++----------- .../dendrite/appservice/storage/storage.go | 2 +- vendor/manifest | 2 +- .../gomatrixserverlib/appservice.go | 21 +---- 5 files changed, 59 insertions(+), 104 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go index 4f4586ebd..9284eae1a 100644 --- a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go @@ -99,50 +99,37 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { "event_id": ev.EventID(), "room_id": ev.RoomID(), "type": ev.Type(), - }).Info("appservice received event from roomserver") + }).Info("appservice received an event from roomserver") - events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) if err != nil { return err } + events := append(missingEvents, ev) - // Create a context to thread through the whole filtering process - ctx := context.TODO() - - if err = s.db.UpdateMemberships(ctx, events, output.NewRoomEvent.RemovesStateEventIDs); err != nil { - return err - } - - // Combine any state and non-state events and send them to the application service - return s.filterRoomserverEvents(ctx, ev) + // Send event to any relevant application services + return s.filterRoomserverEvents(context.TODO(), events) } -// lookupStateEvents looks up the state events that are added by a new event. -func (s *OutputRoomEventConsumer) lookupStateEvents( +// lookupMissingStateEvents looks up the state events that are added by a new event, +// and returns any not already present. +func (s *OutputRoomEventConsumer) lookupMissingStateEvents( addsStateEventIDs []string, event gomatrixserverlib.Event, ) ([]gomatrixserverlib.Event, error) { // Fast path if there aren't any new state events. if len(addsStateEventIDs) == 0 { - // If the event is a membership update (e.g. for a profile update), it won't - // show up in AddsStateEventIDs, so we need to add it manually - if event.Type() == "m.room.member" { - return []gomatrixserverlib.Event{event}, nil - } - return nil, nil + return []gomatrixserverlib.Event{}, nil } // Fast path if the only state event added is the event itself. if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { - return []gomatrixserverlib.Event{event}, nil + return []gomatrixserverlib.Event{}, nil } result := []gomatrixserverlib.Event{} missing := []string{} for _, id := range addsStateEventIDs { - // Append the current event in the results if its ID is in the events list - if id == event.EventID() { - result = append(result, event) - } else { + if id != event.EventID() { // If the event isn't the current one, add it to the list of events // to retrieve from the roomserver missing = append(missing, id) @@ -168,18 +155,20 @@ func (s *OutputRoomEventConsumer) lookupStateEvents( // application service. func (s *OutputRoomEventConsumer) filterRoomserverEvents( ctx context.Context, - event gomatrixserverlib.Event, + events []gomatrixserverlib.Event, ) error { for _, ws := range s.workerStates { - // Check if this event is interesting to this application service - if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { - // Queue this event to be sent off to the application service - if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil { - log.WithError(err).Warn("failed to insert incoming event into appservices database") - } else { - // Tell our worker to send out new messages by updating remaining message - // count and waking them up with a broadcast - ws.NotifyNewEvent() + for _, event := range events { + // Check if this event is interesting to this application service + if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { + // Queue this event to be sent off to the application service + if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil { + log.WithError(err).Warn("failed to insert incoming event into appservices database") + } else { + // Tell our worker to send out new messages by updating remaining message + // count and waking them up with a broadcast + ws.NotifyNewEvent() + } } } } diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index 7bff2c495..5b47dc60d 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -17,6 +17,7 @@ package storage import ( "context" "database/sql" + "encoding/json" "time" "github.com/matrix-org/gomatrixserverlib" @@ -30,18 +31,8 @@ CREATE TABLE IF NOT EXISTS appservice_events ( id BIGSERIAL NOT NULL PRIMARY KEY, -- The ID of the application service the event will be sent to as_id TEXT NOT NULL, - -- The ID of the event - event_id TEXT NOT NULL, - -- Unix seconds that the event was sent at from the originating server - origin_server_ts BIGINT NOT NULL, - -- The ID of the room that the event was sent in - room_id TEXT NOT NULL, - -- The type of the event (e.g. m.text) - type TEXT NOT NULL, - -- The ID of the user that sent the event - sender TEXT NOT NULL, - -- The JSON representation of the event's content. Text to avoid db JSON parsing - event_content TEXT, + -- JSON representation of the event + event_json TEXT NOT NULL, -- The ID of the transaction that this event is a part of txn_id BIGINT NOT NULL ); @@ -50,15 +41,15 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); ` const selectEventsByApplicationServiceIDSQL = "" + - "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " + + "SELECT id, event_json, txn_id " + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" const countEventsByApplicationServiceIDSQL = "" + - "SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1" + "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" const insertEventSQL = "" + - "INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id) " + - "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + "INSERT INTO appservice_events(as_id, event_json, txn_id) " + + "VALUES ($1, $2, $3)" const updateTxnIDForEventsSQL = "" + "UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3" @@ -109,23 +100,23 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( limit int, ) ( txnID, maxID int, - events []gomatrixserverlib.ApplicationServiceEvent, + events []gomatrixserverlib.Event, err error, ) { // Retrieve events from the database. Unsuccessfully sent events first - eventRowsCurr, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID) + eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID) if err != nil { return 0, 0, nil, err } defer func() { - err = eventRowsCurr.Close() + err = eventRows.Close() if err != nil { log.WithFields(log.Fields{ "appservice": applicationServiceID, }).WithError(err).Fatalf("appservice unable to select new events to send") } }() - events, maxID, txnID, err = retrieveEvents(eventRowsCurr, limit) + events, maxID, txnID, err = retrieveEvents(eventRows, limit) if err != nil { return 0, 0, nil, err } @@ -133,35 +124,36 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( return } -func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.ApplicationServiceEvent, maxID, txnID int, err error) { +func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, err error) { // Get current time for use in calculating event age nowMilli := time.Now().UnixNano() / int64(time.Millisecond) // Iterate through each row and store event contents // If txn_id changes dramatically, we've switched from collecting old events to // new ones. Send back those events first. - lastTxnID := -2 // Invalid transaction ID + invalidTxnID := -2 + lastTxnID := invalidTxnID for eventsProcessed := 0; eventRows.Next(); { - var event gomatrixserverlib.ApplicationServiceEvent - var eventContent sql.NullString + var event gomatrixserverlib.Event + var eventJSON []byte var id int err = eventRows.Scan( &id, - &event.EventID, - &event.OriginServerTimestamp, - &event.RoomID, - &event.Type, - &event.UserID, - &eventContent, + &eventJSON, &txnID, ) if err != nil { return nil, 0, 0, err } + // Unmarshal eventJSON + if err = json.Unmarshal(eventJSON, &event); err != nil { + return nil, 0, 0, err + } + // If txnID has changed on this event from the previous event, then we've // reached the end of a transaction's events. Return only those events. - if lastTxnID > -2 && lastTxnID != txnID { + if lastTxnID > invalidTxnID && lastTxnID != txnID { return events, maxID, lastTxnID, nil } lastTxnID = txnID @@ -174,24 +166,16 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib. } } - if eventContent.Valid { - event.Content = gomatrixserverlib.RawJSON(eventContent.String) - } if id > maxID { maxID = id } // Portion of the event that is unsigned due to rapid change - event.Unsigned = gomatrixserverlib.ApplicationServiceUnsigned{ - // Get age of the event from original timestamp and current time - // TODO: Consider removing age as not many app services use it - Age: nowMilli - event.OriginServerTimestamp, + // TODO: Consider removing age as not many app services use it + if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil { + return nil, 0, 0, err } - // TODO: Synapse does this. It's unnecessary to send Sender and UserID as the - // same value. Do app services really require this? :) - event.Sender = event.UserID - events = append(events, event) } @@ -220,15 +204,16 @@ func (s *eventsStatements) insertEvent( appServiceID string, event *gomatrixserverlib.Event, ) (err error) { + // Convert event to JSON before inserting + eventJSON, err := json.Marshal(event) + if err != nil { + return err + } + _, err = s.insertEventStmt.ExecContext( ctx, appServiceID, - event.EventID(), - event.OriginServerTS(), - event.RoomID(), - event.Type(), - event.Sender(), - event.Content(), + eventJSON, -1, // No transaction ID yet ) return diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go index 19a7ce6b9..eec2a0312 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go @@ -67,7 +67,7 @@ func (d *Database) GetEventsWithAppServiceID( ctx context.Context, appServiceID string, limit int, -) (int, int, []gomatrixserverlib.ApplicationServiceEvent, error) { +) (int, int, []gomatrixserverlib.Event, error) { return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) } diff --git a/vendor/manifest b/vendor/manifest index 5694d62a2..9e1d28550 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -135,7 +135,7 @@ { "importpath": "github.com/matrix-org/gomatrixserverlib", "repository": "https://github.com/matrix-org/gomatrixserverlib", - "revision": "4c45af876608399b13cf891e2d9f89f01c67137a", + "revision": "929828872b51e6733166553d6b1a20155b6ab829", "branch": "master" }, { diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go index 211cb9f1c..a67527947 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/appservice.go @@ -15,27 +15,8 @@ package gomatrixserverlib -// ApplicationServiceUnsigned is the contents of the unsigned field of an -// ApplicationServiceEvent. -type ApplicationServiceUnsigned struct { - Age int64 `json:"age,omitempty"` -} - -// ApplicationServiceEvent is an event format that is sent off to an -// application service as part of a transaction. -type ApplicationServiceEvent struct { - Unsigned ApplicationServiceUnsigned `json:"unsigned,omitempty"` - Content RawJSON `json:"content,omitempty"` - EventID string `json:"event_id,omitempty"` - OriginServerTimestamp int64 `json:"origin_server_ts,omitempty"` - RoomID string `json:"room_id,omitempty"` - Sender string `json:"sender,omitempty"` - Type string `json:"type,omitempty"` - UserID string `json:"user_id,omitempty"` -} - // ApplicationServiceTransaction is the transaction that is sent off to an // application service. type ApplicationServiceTransaction struct { - Events []ApplicationServiceEvent `json:"events"` + Events []Event `json:"events"` }