Make SendEvents return the correct event ID

This commit is contained in:
Anant Prakash 2018-05-24 19:52:48 +05:30
parent 2d56033725
commit d72a805b3f
No known key found for this signature in database
GPG key ID: C5D399F626523045
5 changed files with 35 additions and 27 deletions

View file

@ -37,7 +37,7 @@ func NewRoomserverProducer(inputAPI api.RoomserverInputAPI) *RoomserverProducer
func (c *RoomserverProducer) SendEvents( func (c *RoomserverProducer) SendEvents(
ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName, ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName,
txnID *api.TransactionID, txnID *api.TransactionID,
) error { ) (string, error) {
ires := make([]api.InputRoomEvent, len(events)) ires := make([]api.InputRoomEvent, len(events))
for i, event := range events { for i, event := range events {
ires[i] = api.InputRoomEvent{ ires[i] = api.InputRoomEvent{
@ -55,10 +55,10 @@ func (c *RoomserverProducer) SendEvents(
// with the state at the event as KindOutlier before it. // with the state at the event as KindOutlier before it.
func (c *RoomserverProducer) SendEventWithState( func (c *RoomserverProducer) SendEventWithState(
ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.Event, ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.Event,
) error { ) (string, error) {
outliers, err := state.Events() outliers, err := state.Events()
if err != nil { if err != nil {
return err return "", err
} }
ires := make([]api.InputRoomEvent, len(outliers)+1) ires := make([]api.InputRoomEvent, len(outliers)+1)
@ -87,16 +87,22 @@ func (c *RoomserverProducer) SendEventWithState(
} }
// SendInputRoomEvents writes the given input room events to the roomserver input API. // SendInputRoomEvents writes the given input room events to the roomserver input API.
func (c *RoomserverProducer) SendInputRoomEvents(ctx context.Context, ires []api.InputRoomEvent) error { func (c *RoomserverProducer) SendInputRoomEvents(
ctx context.Context, ires []api.InputRoomEvent,
) (eventID string, err error) {
request := api.InputRoomEventsRequest{InputRoomEvents: ires} request := api.InputRoomEventsRequest{InputRoomEvents: ires}
var response api.InputRoomEventsResponse var response api.InputRoomEventsResponse
return c.InputAPI.InputRoomEvents(ctx, &request, &response) err = c.InputAPI.InputRoomEvents(ctx, &request, &response)
eventID = response.EventID
return
} }
// SendInvite writes the invite event to the roomserver input API. // SendInvite writes the invite event to the roomserver input API.
// This should only be needed for invite events that occur outside of a known room. // This should only be needed for invite events that occur outside of a known room.
// If we are in the room then the event should be sent using the SendEvents method. // If we are in the room then the event should be sent using the SendEvents method.
func (c *RoomserverProducer) SendInvite(ctx context.Context, inviteEvent gomatrixserverlib.Event) error { func (c *RoomserverProducer) SendInvite(
ctx context.Context, inviteEvent gomatrixserverlib.Event,
) error {
request := api.InputRoomEventsRequest{ request := api.InputRoomEventsRequest{
InputInviteEvents: []api.InputInviteEvent{{Event: inviteEvent}}, InputInviteEvents: []api.InputInviteEvent{{Event: inviteEvent}},
} }

View file

@ -107,16 +107,18 @@ func SendEvent(
} }
} }
// pass the new event to the roomserver // pass the new event to the roomserver and receive the correct event ID
if err := producer.SendEvents( // event ID in case of duplicate transaction is discarded
eventID, err := producer.SendEvents(
req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName, txnAndDeviceID, req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName, txnAndDeviceID,
); err != nil { )
if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
res := util.JSONResponse{ res := util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: sendEventResponse{e.EventID()}, JSON: sendEventResponse{eventID},
} }
// Add response to transactionsCache // Add response to transactionsCache
if txnID != nil { if txnID != nil {

View file

@ -94,7 +94,9 @@ type InputRoomEventsRequest struct {
} }
// InputRoomEventsResponse is a response to InputRoomEvents // InputRoomEventsResponse is a response to InputRoomEvents
type InputRoomEventsResponse struct{} type InputRoomEventsResponse struct {
EventID string `json:"event_id"`
}
// RoomserverInputAPI is used to write events to the room server. // RoomserverInputAPI is used to write events to the room server.
type RoomserverInputAPI interface { type RoomserverInputAPI interface {

View file

@ -87,42 +87,38 @@ func processRoomEvent(
db RoomEventDatabase, db RoomEventDatabase,
ow OutputRoomEventWriter, ow OutputRoomEventWriter,
input api.InputRoomEvent, input api.InputRoomEvent,
) error { ) (eventID string, err error) {
// Parse and validate the event JSON // Parse and validate the event JSON
event := input.Event event := input.Event
// Check that the event passes authentication checks and work out the numeric IDs for the auth events. // Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, event, input.AuthEventIDs) authEventNIDs, err := checkAuthEvents(ctx, db, event, input.AuthEventIDs)
if err != nil { if err != nil {
return err return
} }
if input.TransactionID != nil { if input.TransactionID != nil {
var eventID string
tdID := input.TransactionID tdID := input.TransactionID
eventID, err = db.GetTransactionEventID( eventID, err = db.GetTransactionEventID(
ctx, tdID.TransactionID, tdID.DeviceID, input.Event.Sender(), ctx, tdID.TransactionID, tdID.DeviceID, input.Event.Sender(),
) )
if err != nil { // On error OR event with the transaction already processed/processesing
return err if err != nil || eventID != "" {
} return
if eventID != "" {
return nil
} }
} }
// Store the event // Store the event
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
if err != nil { if err != nil {
return err return
} }
if input.Kind == api.KindOutlier { if input.Kind == api.KindOutlier {
// For outliers we can stop after we've stored the event itself as it // For outliers we can stop after we've stored the event itself as it
// doesn't have any associated state to store and we don't need to // doesn't have any associated state to store and we don't need to
// notify anyone about it. // notify anyone about it.
return nil return event.EventID(), nil
} }
if stateAtEvent.BeforeStateSnapshotNID == 0 { if stateAtEvent.BeforeStateSnapshotNID == 0 {
@ -130,7 +126,7 @@ func processRoomEvent(
// Lets calculate one. // Lets calculate one.
err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event) err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event)
if err != nil { if err != nil {
return err return
} }
} }
@ -140,7 +136,9 @@ func processRoomEvent(
} }
// Update the extremities of the event graph for the room // Update the extremities of the event graph for the room
return updateLatestEvents(ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID) return event.EventID(), updateLatestEvents(
ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID,
)
} }
func calculateAndSetState( func calculateAndSetState(

View file

@ -60,17 +60,17 @@ func (r *RoomserverInputAPI) InputRoomEvents(
ctx context.Context, ctx context.Context,
request *api.InputRoomEventsRequest, request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse, response *api.InputRoomEventsResponse,
) error { ) (err error) {
// We lock as processRoomEvent can only be called once at a time // We lock as processRoomEvent can only be called once at a time
r.mutex.Lock() r.mutex.Lock()
defer r.mutex.Unlock() defer r.mutex.Unlock()
for i := range request.InputRoomEvents { for i := range request.InputRoomEvents {
if err := processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil {
return err return err
} }
} }
for i := range request.InputInviteEvents { for i := range request.InputInviteEvents {
if err := processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil { if err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil {
return err return err
} }
} }