Use gmsl.Event instead of AS-only event in transactions

Also clear up the logic on lookupStateEvents a little bit.
This commit is contained in:
Andrew Morgan 2018-07-04 14:51:20 +01:00
parent b0f209b91f
commit f0c2ccb37a
5 changed files with 59 additions and 104 deletions

View file

@ -99,50 +99,37 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"type": ev.Type(),
}).Info("appservice received event from roomserver")
}).Info("appservice received an event from roomserver")
events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
if err != nil {
return err
}
events := append(missingEvents, ev)
// Create a context to thread through the whole filtering process
ctx := context.TODO()
if err = s.db.UpdateMemberships(ctx, events, output.NewRoomEvent.RemovesStateEventIDs); err != nil {
return err
}
// Combine any state and non-state events and send them to the application service
return s.filterRoomserverEvents(ctx, ev)
// Send event to any relevant application services
return s.filterRoomserverEvents(context.TODO(), events)
}
// lookupStateEvents looks up the state events that are added by a new event.
func (s *OutputRoomEventConsumer) lookupStateEvents(
// lookupMissingStateEvents looks up the state events that are added by a new event,
// and returns any not already present.
func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.Event,
) ([]gomatrixserverlib.Event, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
// If the event is a membership update (e.g. for a profile update), it won't
// show up in AddsStateEventIDs, so we need to add it manually
if event.Type() == "m.room.member" {
return []gomatrixserverlib.Event{event}, nil
}
return nil, nil
return []gomatrixserverlib.Event{}, nil
}
// Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
return []gomatrixserverlib.Event{event}, nil
return []gomatrixserverlib.Event{}, nil
}
result := []gomatrixserverlib.Event{}
missing := []string{}
for _, id := range addsStateEventIDs {
// Append the current event in the results if its ID is in the events list
if id == event.EventID() {
result = append(result, event)
} else {
if id != event.EventID() {
// If the event isn't the current one, add it to the list of events
// to retrieve from the roomserver
missing = append(missing, id)
@ -168,18 +155,20 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// application service.
func (s *OutputRoomEventConsumer) filterRoomserverEvents(
ctx context.Context,
event gomatrixserverlib.Event,
events []gomatrixserverlib.Event,
) error {
for _, ws := range s.workerStates {
// Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
// Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database")
} else {
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
ws.NotifyNewEvent()
for _, event := range events {
// Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
// Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database")
} else {
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
ws.NotifyNewEvent()
}
}
}
}

View file

@ -17,6 +17,7 @@ package storage
import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/gomatrixserverlib"
@ -30,18 +31,8 @@ CREATE TABLE IF NOT EXISTS appservice_events (
id BIGSERIAL NOT NULL PRIMARY KEY,
-- The ID of the application service the event will be sent to
as_id TEXT NOT NULL,
-- The ID of the event
event_id TEXT NOT NULL,
-- Unix seconds that the event was sent at from the originating server
origin_server_ts BIGINT NOT NULL,
-- The ID of the room that the event was sent in
room_id TEXT NOT NULL,
-- The type of the event (e.g. m.text)
type TEXT NOT NULL,
-- The ID of the user that sent the event
sender TEXT NOT NULL,
-- The JSON representation of the event's content. Text to avoid db JSON parsing
event_content TEXT,
-- JSON representation of the event
event_json TEXT NOT NULL,
-- The ID of the transaction that this event is a part of
txn_id BIGINT NOT NULL
);
@ -50,15 +41,15 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
`
const selectEventsByApplicationServiceIDSQL = "" +
"SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " +
"SELECT id, event_json, txn_id " +
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1"
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
const insertEventSQL = "" +
"INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
"INSERT INTO appservice_events(as_id, event_json, txn_id) " +
"VALUES ($1, $2, $3)"
const updateTxnIDForEventsSQL = "" +
"UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
@ -109,23 +100,23 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
limit int,
) (
txnID, maxID int,
events []gomatrixserverlib.ApplicationServiceEvent,
events []gomatrixserverlib.Event,
err error,
) {
// Retrieve events from the database. Unsuccessfully sent events first
eventRowsCurr, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
if err != nil {
return 0, 0, nil, err
}
defer func() {
err = eventRowsCurr.Close()
err = eventRows.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice": applicationServiceID,
}).WithError(err).Fatalf("appservice unable to select new events to send")
}
}()
events, maxID, txnID, err = retrieveEvents(eventRowsCurr, limit)
events, maxID, txnID, err = retrieveEvents(eventRows, limit)
if err != nil {
return 0, 0, nil, err
}
@ -133,35 +124,36 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
return
}
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.ApplicationServiceEvent, maxID, txnID int, err error) {
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
// Iterate through each row and store event contents
// If txn_id changes dramatically, we've switched from collecting old events to
// new ones. Send back those events first.
lastTxnID := -2 // Invalid transaction ID
invalidTxnID := -2
lastTxnID := invalidTxnID
for eventsProcessed := 0; eventRows.Next(); {
var event gomatrixserverlib.ApplicationServiceEvent
var eventContent sql.NullString
var event gomatrixserverlib.Event
var eventJSON []byte
var id int
err = eventRows.Scan(
&id,
&event.EventID,
&event.OriginServerTimestamp,
&event.RoomID,
&event.Type,
&event.UserID,
&eventContent,
&eventJSON,
&txnID,
)
if err != nil {
return nil, 0, 0, err
}
// Unmarshal eventJSON
if err = json.Unmarshal(eventJSON, &event); err != nil {
return nil, 0, 0, err
}
// If txnID has changed on this event from the previous event, then we've
// reached the end of a transaction's events. Return only those events.
if lastTxnID > -2 && lastTxnID != txnID {
if lastTxnID > invalidTxnID && lastTxnID != txnID {
return events, maxID, lastTxnID, nil
}
lastTxnID = txnID
@ -174,24 +166,16 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
}
}
if eventContent.Valid {
event.Content = gomatrixserverlib.RawJSON(eventContent.String)
}
if id > maxID {
maxID = id
}
// Portion of the event that is unsigned due to rapid change
event.Unsigned = gomatrixserverlib.ApplicationServiceUnsigned{
// Get age of the event from original timestamp and current time
// TODO: Consider removing age as not many app services use it
Age: nowMilli - event.OriginServerTimestamp,
// TODO: Consider removing age as not many app services use it
if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
return nil, 0, 0, err
}
// TODO: Synapse does this. It's unnecessary to send Sender and UserID as the
// same value. Do app services really require this? :)
event.Sender = event.UserID
events = append(events, event)
}
@ -220,15 +204,16 @@ func (s *eventsStatements) insertEvent(
appServiceID string,
event *gomatrixserverlib.Event,
) (err error) {
// Convert event to JSON before inserting
eventJSON, err := json.Marshal(event)
if err != nil {
return err
}
_, err = s.insertEventStmt.ExecContext(
ctx,
appServiceID,
event.EventID(),
event.OriginServerTS(),
event.RoomID(),
event.Type(),
event.Sender(),
event.Content(),
eventJSON,
-1, // No transaction ID yet
)
return

View file

@ -67,7 +67,7 @@ func (d *Database) GetEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
limit int,
) (int, int, []gomatrixserverlib.ApplicationServiceEvent, error) {
) (int, int, []gomatrixserverlib.Event, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}

2
vendor/manifest vendored
View file

@ -135,7 +135,7 @@
{
"importpath": "github.com/matrix-org/gomatrixserverlib",
"repository": "https://github.com/matrix-org/gomatrixserverlib",
"revision": "4c45af876608399b13cf891e2d9f89f01c67137a",
"revision": "929828872b51e6733166553d6b1a20155b6ab829",
"branch": "master"
},
{

View file

@ -15,27 +15,8 @@
package gomatrixserverlib
// ApplicationServiceUnsigned is the contents of the unsigned field of an
// ApplicationServiceEvent.
type ApplicationServiceUnsigned struct {
Age int64 `json:"age,omitempty"`
}
// ApplicationServiceEvent is an event format that is sent off to an
// application service as part of a transaction.
type ApplicationServiceEvent struct {
Unsigned ApplicationServiceUnsigned `json:"unsigned,omitempty"`
Content RawJSON `json:"content,omitempty"`
EventID string `json:"event_id,omitempty"`
OriginServerTimestamp int64 `json:"origin_server_ts,omitempty"`
RoomID string `json:"room_id,omitempty"`
Sender string `json:"sender,omitempty"`
Type string `json:"type,omitempty"`
UserID string `json:"user_id,omitempty"`
}
// ApplicationServiceTransaction is the transaction that is sent off to an
// application service.
type ApplicationServiceTransaction struct {
Events []ApplicationServiceEvent `json:"events"`
Events []Event `json:"events"`
}