[roomserver] Store transaction ID and prevent reprocessing of events (#446)

This commit is contained in:
Anant Prakash 2018-05-26 16:33:35 +05:30 committed by Erik Johnston
parent ed388a32b7
commit 60e77959ee
16 changed files with 206 additions and 49 deletions

View file

@ -37,7 +37,7 @@ func NewRoomserverProducer(inputAPI api.RoomserverInputAPI) *RoomserverProducer
func (c *RoomserverProducer) SendEvents(
ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName,
txnID *api.TransactionID,
) error {
) (string, error) {
ires := make([]api.InputRoomEvent, len(events))
for i, event := range events {
ires[i] = api.InputRoomEvent{
@ -83,20 +83,27 @@ func (c *RoomserverProducer) SendEventWithState(
StateEventIDs: stateEventIDs,
}
return c.SendInputRoomEvents(ctx, ires)
_, err = c.SendInputRoomEvents(ctx, ires)
return err
}
// SendInputRoomEvents writes the given input room events to the roomserver input API.
func (c *RoomserverProducer) SendInputRoomEvents(ctx context.Context, ires []api.InputRoomEvent) error {
func (c *RoomserverProducer) SendInputRoomEvents(
ctx context.Context, ires []api.InputRoomEvent,
) (eventID string, err error) {
request := api.InputRoomEventsRequest{InputRoomEvents: ires}
var response api.InputRoomEventsResponse
return c.InputAPI.InputRoomEvents(ctx, &request, &response)
err = c.InputAPI.InputRoomEvents(ctx, &request, &response)
eventID = response.EventID
return
}
// SendInvite writes the invite event to the roomserver input API.
// This should only be needed for invite events that occur outside of a known room.
// If we are in the room then the event should be sent using the SendEvents method.
func (c *RoomserverProducer) SendInvite(ctx context.Context, inviteEvent gomatrixserverlib.Event) error {
func (c *RoomserverProducer) SendInvite(
ctx context.Context, inviteEvent gomatrixserverlib.Event,
) error {
request := api.InputRoomEventsRequest{
InputInviteEvents: []api.InputInviteEvent{{Event: inviteEvent}},
}

View file

@ -261,7 +261,7 @@ func createRoom(req *http.Request, device *authtypes.Device,
}
// send events to the room server
err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil)
_, err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil)
if err != nil {
return httputil.LogThenError(req, err)
}

View file

@ -217,7 +217,7 @@ func (r joinRoomReq) joinRoomUsingServers(
var queryRes api.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.queryAPI, &queryRes)
if err == nil {
if err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName, nil); err != nil {
if _, err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName, nil); err != nil {
return httputil.LogThenError(r.req, err)
}
return util.JSONResponse{

View file

@ -97,7 +97,7 @@ func SendMembership(
return httputil.LogThenError(req, err)
}
if err := producer.SendEvents(
if _, err := producer.SendEvents(
req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil,
); err != nil {
return httputil.LogThenError(req, err)

View file

@ -138,7 +138,7 @@ func SetAvatarURL(
return httputil.LogThenError(req, err)
}
if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
return httputil.LogThenError(req, err)
}
@ -230,7 +230,7 @@ func SetDisplayName(
return httputil.LogThenError(req, err)
}
if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
return httputil.LogThenError(req, err)
}

View file

@ -107,16 +107,18 @@ func SendEvent(
}
}
// pass the new event to the roomserver
if err := producer.SendEvents(
// pass the new event to the roomserver and receive the correct event ID
// event ID in case of duplicate transaction is discarded
eventID, err := producer.SendEvents(
req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName, txnAndDeviceID,
); err != nil {
)
if err != nil {
return httputil.LogThenError(req, err)
}
res := util.JSONResponse{
Code: http.StatusOK,
JSON: sendEventResponse{e.EventID()},
JSON: sendEventResponse{eventID},
}
// Add response to transactionsCache
if txnID != nil {

View file

@ -355,5 +355,6 @@ func emit3PIDInviteEvent(
return err
}
return producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil)
_, err = producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil)
return err
}

View file

@ -169,7 +169,7 @@ func SendJoin(
// Send the events to the room server.
// We are responsible for notifying other servers that the user has joined
// the room, so set SendAsServer to cfg.Matrix.ServerName
err = producer.SendEvents(ctx, []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil)
_, err = producer.SendEvents(ctx, []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil)
if err != nil {
return httputil.LogThenError(httpReq, err)
}

View file

@ -170,7 +170,8 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
// TODO: Check that the event is allowed by its auth_events.
// pass the event to the roomserver
return t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil)
_, err := t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil)
return err
}
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {

View file

@ -81,7 +81,7 @@ func CreateInvitesFrom3PIDInvites(
}
// Send all the events
if err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil {
if _, err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil {
return httputil.LogThenError(req, err)
}
@ -153,7 +153,7 @@ func ExchangeThirdPartyInvite(
}
// Send the event to the roomserver
if err = producer.SendEvents(
if _, err = producer.SendEvents(
httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName, nil,
); err != nil {
return httputil.LogThenError(httpReq, err)

View file

@ -94,7 +94,9 @@ type InputRoomEventsRequest struct {
}
// InputRoomEventsResponse is a response to InputRoomEvents
type InputRoomEventsResponse struct{}
type InputRoomEventsResponse struct {
EventID string `json:"event_id"`
}
// RoomserverInputAPI is used to write events to the room server.
type RoomserverInputAPI interface {

View file

@ -32,6 +32,7 @@ type RoomEventDatabase interface {
StoreEvent(
ctx context.Context,
event gomatrixserverlib.Event,
txnAndDeviceID *api.TransactionID,
authEventNIDs []types.EventNID,
) (types.RoomNID, types.StateAtEvent, error)
// Look up the state entries for a list of string event IDs
@ -61,6 +62,13 @@ type RoomEventDatabase interface {
MembershipUpdater(
ctx context.Context, roomID, targerUserID string,
) (types.MembershipUpdater, error)
// Look up event ID by transaction's info.
// This is used to determine if the room event is processed/processing already.
// Returns an empty string if no such event exists.
GetTransactionEventID(
ctx context.Context, transactionID string,
deviceID string, userID string,
) (string, error)
}
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
@ -79,52 +87,46 @@ func processRoomEvent(
db RoomEventDatabase,
ow OutputRoomEventWriter,
input api.InputRoomEvent,
) error {
) (eventID string, err error) {
// Parse and validate the event JSON
event := input.Event
// Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, event, input.AuthEventIDs)
if err != nil {
return err
return
}
if input.TransactionID != nil {
tdID := input.TransactionID
eventID, err = db.GetTransactionEventID(
ctx, tdID.TransactionID, tdID.DeviceID, input.Event.Sender(),
)
// On error OR event with the transaction already processed/processesing
if err != nil || eventID != "" {
return
}
}
// Store the event
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, authEventNIDs)
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
if err != nil {
return err
return
}
if input.Kind == api.KindOutlier {
// For outliers we can stop after we've stored the event itself as it
// doesn't have any associated state to store and we don't need to
// notify anyone about it.
return nil
return event.EventID(), nil
}
if stateAtEvent.BeforeStateSnapshotNID == 0 {
// We haven't calculated a state for this event yet.
// Lets calculate one.
if input.HasState {
// We've been told what the state at the event is so we don't need to calculate it.
// Check that those state events are in the database and store the state.
var entries []types.StateEntry
if entries, err = db.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
return err
}
if stateAtEvent.BeforeStateSnapshotNID, err = db.AddState(ctx, roomNID, nil, entries); err != nil {
return err
}
} else {
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
if stateAtEvent.BeforeStateSnapshotNID, err = state.CalculateAndStoreStateBeforeEvent(ctx, db, event, roomNID); err != nil {
return err
}
}
err = db.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event)
if err != nil {
return err
return
}
}
@ -134,7 +136,38 @@ func processRoomEvent(
}
// Update the extremities of the event graph for the room
return updateLatestEvents(ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID)
return event.EventID(), updateLatestEvents(
ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID,
)
}
func calculateAndSetState(
ctx context.Context,
db RoomEventDatabase,
input api.InputRoomEvent,
roomNID types.RoomNID,
stateAtEvent *types.StateAtEvent,
event gomatrixserverlib.Event,
) error {
var err error
if input.HasState {
// We've been told what the state at the event is so we don't need to calculate it.
// Check that those state events are in the database and store the state.
var entries []types.StateEntry
if entries, err = db.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
return err
}
if stateAtEvent.BeforeStateSnapshotNID, err = db.AddState(ctx, roomNID, nil, entries); err != nil {
return err
}
} else {
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
if stateAtEvent.BeforeStateSnapshotNID, err = state.CalculateAndStoreStateBeforeEvent(ctx, db, event, roomNID); err != nil {
return err
}
}
return db.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
}
func processInviteEvent(

View file

@ -60,17 +60,17 @@ func (r *RoomserverInputAPI) InputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
) error {
) (err error) {
// We lock as processRoomEvent can only be called once at a time
r.mutex.Lock()
defer r.mutex.Unlock()
for i := range request.InputRoomEvents {
if err := processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil {
if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil {
return err
}
}
for i := range request.InputInviteEvents {
if err := processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil {
if err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil {
return err
}
}

View file

@ -30,6 +30,7 @@ type statements struct {
roomAliasesStatements
inviteStatements
membershipStatements
transactionStatements
}
func (s *statements) prepare(db *sql.DB) error {
@ -47,6 +48,7 @@ func (s *statements) prepare(db *sql.DB) error {
s.roomAliasesStatements.prepare,
s.inviteStatements.prepare,
s.membershipStatements.prepare,
s.transactionStatements.prepare,
} {
if err = prepare(db); err != nil {
return err

View file

@ -20,6 +20,7 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@ -45,7 +46,8 @@ func Open(dataSourceName string) (*Database, error) {
// StoreEvent implements input.EventDatabase
func (d *Database) StoreEvent(
ctx context.Context, event gomatrixserverlib.Event, authEventNIDs []types.EventNID,
ctx context.Context, event gomatrixserverlib.Event,
txnAndDeviceID *api.TransactionID, authEventNIDs []types.EventNID,
) (types.RoomNID, types.StateAtEvent, error) {
var (
roomNID types.RoomNID
@ -56,6 +58,15 @@ func (d *Database) StoreEvent(
err error
)
if txnAndDeviceID != nil {
if err = d.statements.insertTransaction(
ctx, txnAndDeviceID.TransactionID,
txnAndDeviceID.DeviceID, event.Sender(), event.EventID(),
); err != nil {
return 0, types.StateAtEvent{}, err
}
}
if roomNID, err = d.assignRoomNID(ctx, nil, event.RoomID()); err != nil {
return 0, types.StateAtEvent{}, err
}
@ -308,6 +319,18 @@ func (d *Database) GetLatestEventsForUpdate(
}, nil
}
// GetTransactionEventID implements input.EventDatabase
func (d *Database) GetTransactionEventID(
ctx context.Context, transactionID string,
deviceID string, userID string,
) (string, error) {
eventID, err := d.statements.selectTransactionEventID(ctx, transactionID, deviceID, userID)
if err == sql.ErrNoRows {
return "", nil
}
return eventID, err
}
type roomRecentEventsUpdater struct {
transaction
d *Database

View file

@ -0,0 +1,86 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
)
const transactionsSchema = `
-- The transactions table holds transaction IDs with sender's info and event ID it belongs to.
-- This table is used by roomserver to prevent reprocessing of events.
CREATE TABLE IF NOT EXISTS roomserver_transactions (
-- The transaction ID of the event.
transaction_id TEXT NOT NULL,
-- The device ID of the originating transaction.
device_id TEXT NOT NULL,
-- User ID of the sender who authored the event
user_id TEXT NOT NULL,
-- Event ID corresponding to the transaction
-- Required to return event ID to client on a duplicate request.
event_id TEXT NOT NULL,
-- A transaction ID is unique for a user and device
-- This automatically creates an index.
PRIMARY KEY (transaction_id, device_id, user_id)
);
`
const insertTransactionSQL = "" +
"INSERT INTO roomserver_transactions (transaction_id, device_id, user_id, event_id)" +
" VALUES ($1, $2, $3, $4)"
const selectTransactionEventIDSQL = "" +
"SELECT event_id FROM roomserver_transactions" +
" WHERE transaction_id = $1 AND device_id = $2 AND user_id = $3"
type transactionStatements struct {
insertTransactionStmt *sql.Stmt
selectTransactionEventIDStmt *sql.Stmt
}
func (s *transactionStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(transactionsSchema)
if err != nil {
return
}
return statementList{
{&s.insertTransactionStmt, insertTransactionSQL},
{&s.selectTransactionEventIDStmt, selectTransactionEventIDSQL},
}.prepare(db)
}
func (s *transactionStatements) insertTransaction(
ctx context.Context,
transactionID string,
deviceID string,
userID string,
eventID string,
) (err error) {
_, err = s.insertTransactionStmt.ExecContext(
ctx, transactionID, deviceID, userID, eventID,
)
return
}
func (s *transactionStatements) selectTransactionEventID(
ctx context.Context,
transactionID string,
deviceID string,
userID string,
) (eventID string, err error) {
err = s.selectTransactionEventIDStmt.QueryRowContext(
ctx, transactionID, deviceID, userID,
).Scan(&eventID)
return
}