Return the pkey id of the inserted event

This commit is contained in:
Kegan Dougal 2017-04-06 14:18:13 +01:00
parent de9e3e5417
commit 2b5c0714d7
5 changed files with 22 additions and 15 deletions

View file

@ -31,7 +31,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS event_id_idx ON output_room_events(event_id);
` `
const insertEventSQL = "" + const insertEventSQL = "" +
"INSERT INTO output_room_events (room_id, event_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4, $5)" "INSERT INTO output_room_events (room_id, event_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4, $5) RETURNING id"
const selectEventsSQL = "" + const selectEventsSQL = "" +
"SELECT event_json FROM output_room_events WHERE event_id = ANY($1)" "SELECT event_json FROM output_room_events WHERE event_id = ANY($1)"
@ -56,11 +56,11 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
} }
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. // InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs.
func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) error { func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) {
_, err := txn.Stmt(s.insertEventStmt).Exec( err = txn.Stmt(s.insertEventStmt).QueryRow(
event.RoomID(), event.EventID(), event.JSON(), pq.StringArray(addState), pq.StringArray(removeState), event.RoomID(), event.EventID(), event.JSON(), pq.StringArray(addState), pq.StringArray(removeState),
) ).Scan(&streamPos)
return err return
} }
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing

View file

@ -39,10 +39,13 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
} }
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the stream position for this event. Returns an error if there was a problem inserting this event. // when generating the stream position for this event. Returns the sync stream position for the inserted event.
func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) error { // Returns an error if there was a problem inserting this event.
return runTransaction(d.db, func(txn *sql.Tx) error { func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos int64, returnErr error) {
if err := d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs); err != nil { returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
var err error
streamPos, err = d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs)
if err != nil {
return err return err
} }
@ -56,7 +59,7 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve
// However, conflict resolution may result in there being different events being added, or even some removed. // However, conflict resolution may result in there being different events being added, or even some removed.
if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() { if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() {
// common case // common case
if err := d.roomstate.UpdateRoomState(txn, []gomatrixserverlib.Event{*ev}, nil); err != nil { if err = d.roomstate.UpdateRoomState(txn, []gomatrixserverlib.Event{*ev}, nil); err != nil {
return err return err
} }
return nil return nil
@ -69,6 +72,7 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve
} }
return d.roomstate.UpdateRoomState(txn, added, removeStateEventIDs) return d.roomstate.UpdateRoomState(txn, added, removeStateEventIDs)
}) })
return
} }
// PartitionOffsets implements common.PartitionStorer // PartitionOffsets implements common.PartitionStorer

View file

@ -1,6 +1,7 @@
package sync package sync
import ( import (
"fmt"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
@ -65,8 +66,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
} }
// OnNewEvent is called when a new event is received from the room server // OnNewEvent is called when a new event is received from the room server
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event) { func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, syncStreamPos int64) {
fmt.Println("OnNewEvent =>", ev.EventID(), syncStreamPos)
} }
func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) { func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) {

View file

@ -65,7 +65,9 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
"room_id": ev.RoomID(), "room_id": ev.RoomID(),
}).Info("received event from roomserver") }).Info("received event from roomserver")
if err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs); err != nil { syncStreamPos, err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs)
if err != nil {
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event": string(ev.JSON()), "event": string(ev.JSON()),
@ -75,7 +77,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: write event failure") }).Panicf("roomserver output log: write event failure")
return nil return nil
} }
s.rp.OnNewEvent(&ev) s.rp.OnNewEvent(&ev, syncStreamPos)
return nil return nil
} }

View file

@ -67,7 +67,7 @@ func (c *ContinualConsumer) Start() error {
} }
for _, offset := range storedOffsets { for _, offset := range storedOffsets {
// We've already processed events from this partition so advance the offset to where we got to. // We've already processed events from this partition so advance the offset to where we got to.
offsets[offset.Partition] = offset.Offset offsets[offset.Partition] = 1 + offset.Offset
} }
var partitionConsumers []sarama.PartitionConsumer var partitionConsumers []sarama.PartitionConsumer