From 2b5c0714d7848e21cad7e6ecb380ceed19d09a07 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 6 Apr 2017 14:18:13 +0100 Subject: [PATCH] Return the pkey id of the inserted event --- .../clientapi/storage/output_room_events_table.go | 10 +++++----- .../dendrite/clientapi/storage/syncserver.go | 14 +++++++++----- .../dendrite/clientapi/sync/requestpool.go | 5 +++-- .../dendrite/clientapi/sync/syncserver.go | 6 ++++-- .../matrix-org/dendrite/common/consumers.go | 2 +- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go index 9c75940cf..2837f6884 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go @@ -31,7 +31,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS event_id_idx ON output_room_events(event_id); ` const insertEventSQL = "" + - "INSERT INTO output_room_events (room_id, event_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4, $5)" + "INSERT INTO output_room_events (room_id, event_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4, $5) RETURNING id" const selectEventsSQL = "" + "SELECT event_json FROM output_room_events WHERE event_id = ANY($1)" @@ -56,11 +56,11 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { } // InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. -func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) error { - _, err := txn.Stmt(s.insertEventStmt).Exec( +func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) { + err = txn.Stmt(s.insertEventStmt).QueryRow( event.RoomID(), event.EventID(), event.JSON(), pq.StringArray(addState), pq.StringArray(removeState), - ) - return err + ).Scan(&streamPos) + return } // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go index 39df640b1..9b74ae033 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -39,10 +39,13 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { } // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races -// when generating the stream position for this event. Returns an error if there was a problem inserting this event. -func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) error { - return runTransaction(d.db, func(txn *sql.Tx) error { - if err := d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs); err != nil { +// when generating the stream position for this event. Returns the sync stream position for the inserted event. +// Returns an error if there was a problem inserting this event. +func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos int64, returnErr error) { + returnErr = runTransaction(d.db, func(txn *sql.Tx) error { + var err error + streamPos, err = d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) + if err != nil { return err } @@ -56,7 +59,7 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve // However, conflict resolution may result in there being different events being added, or even some removed. if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() { // common case - if err := d.roomstate.UpdateRoomState(txn, []gomatrixserverlib.Event{*ev}, nil); err != nil { + if err = d.roomstate.UpdateRoomState(txn, []gomatrixserverlib.Event{*ev}, nil); err != nil { return err } return nil @@ -69,6 +72,7 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve } return d.roomstate.UpdateRoomState(txn, added, removeStateEventIDs) }) + return } // PartitionOffsets implements common.PartitionStorer diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go index 62b16a1f8..aefa179a3 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go @@ -1,6 +1,7 @@ package sync import ( + "fmt" "net/http" "strconv" "time" @@ -65,8 +66,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } // OnNewEvent is called when a new event is received from the room server -func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event) { - +func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, syncStreamPos int64) { + fmt.Println("OnNewEvent =>", ev.EventID(), syncStreamPos) } func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) { diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go index 5454c6985..359b0a50a 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -65,7 +65,9 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { "room_id": ev.RoomID(), }).Info("received event from roomserver") - if err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs); err != nil { + syncStreamPos, err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs) + + if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ "event": string(ev.JSON()), @@ -75,7 +77,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } - s.rp.OnNewEvent(&ev) + s.rp.OnNewEvent(&ev, syncStreamPos) return nil } diff --git a/src/github.com/matrix-org/dendrite/common/consumers.go b/src/github.com/matrix-org/dendrite/common/consumers.go index 891e080b2..9d16fe47d 100644 --- a/src/github.com/matrix-org/dendrite/common/consumers.go +++ b/src/github.com/matrix-org/dendrite/common/consumers.go @@ -67,7 +67,7 @@ func (c *ContinualConsumer) Start() error { } for _, offset := range storedOffsets { // We've already processed events from this partition so advance the offset to where we got to. - offsets[offset.Partition] = offset.Offset + offsets[offset.Partition] = 1 + offset.Offset } var partitionConsumers []sarama.PartitionConsumer