diff --git a/federationapi/routing/peek.go b/federationapi/routing/peek.go index 65df54079..f3b3fd382 100644 --- a/federationapi/routing/peek.go +++ b/federationapi/routing/peek.go @@ -71,9 +71,9 @@ func Peek( err := rsAPI.PerformInboundPeek( httpReq.Context(), &api.PerformInboundPeekRequest{ - RoomID: roomID, - PeekID: peekID, - ServerName: request.Origin(), + RoomID: roomID, + PeekID: peekID, + ServerName: request.Origin(), RenewalInterval: renewalInterval, }, &response, @@ -88,10 +88,10 @@ func Peek( } respPeek := gomatrixserverlib.RespPeek{ - StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), - AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), - RoomVersion: response.RoomVersion, - LatestEvent: response.LatestEvent.Unwrap(), + StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), + AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), + RoomVersion: response.RoomVersion, + LatestEvent: response.LatestEvent.Unwrap(), RenewalInterval: renewalInterval, } @@ -100,4 +100,3 @@ func Peek( JSON: respPeek, } } - diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index fef706865..5663cc473 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -119,7 +119,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // causing the federationsender to start sending messages to the peeking server func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error { // FIXME: do something with orp.LatestEventID to prevent races - return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval) + return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval) } // processMessage updates the list of currently joined hosts in the room diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index ec2fbe1ec..3a0c97826 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -304,7 +304,7 @@ func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer( renewing := false if outboundPeek != nil { nowMilli := time.Now().UnixNano() / int64(time.Millisecond) - if nowMilli > outboundPeek.RenewedTimestamp + outboundPeek.RenewalInterval { + if nowMilli > outboundPeek.RenewedTimestamp+outboundPeek.RenewalInterval { logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID) renewing = true } else { diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go index 0623fd9aa..2adad520d 100644 --- a/federationsender/inthttp/client.go +++ b/federationsender/inthttp/client.go @@ -20,7 +20,7 @@ const ( FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest" FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest" FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest" - FederationSenderPerformOutboundPeekRequestPath = "/federationsender/performOutboundPeekRequest" + FederationSenderPerformOutboundPeekRequestPath = "/federationsender/performOutboundPeekRequest" FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" diff --git a/federationsender/storage/sqlite3/inbound_peeks_table.go b/federationsender/storage/sqlite3/inbound_peeks_table.go index 2f1269ac0..d5eacf9e4 100644 --- a/federationsender/storage/sqlite3/inbound_peeks_table.go +++ b/federationsender/storage/sqlite3/inbound_peeks_table.go @@ -56,7 +56,7 @@ const deleteInboundPeeksSQL = "" + "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1" type inboundPeeksStatements struct { - db *sql.DB + db *sql.DB insertInboundPeekStmt *sql.Stmt selectInboundPeekStmt *sql.Stmt selectInboundPeeksStmt *sql.Stmt diff --git a/federationsender/storage/sqlite3/outbound_peeks_table.go b/federationsender/storage/sqlite3/outbound_peeks_table.go index 67edda4d3..02aefce79 100644 --- a/federationsender/storage/sqlite3/outbound_peeks_table.go +++ b/federationsender/storage/sqlite3/outbound_peeks_table.go @@ -56,7 +56,7 @@ const deleteOutboundPeeksSQL = "" + "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1" type outboundPeeksStatements struct { - db *sql.DB + db *sql.DB insertOutboundPeekStmt *sql.Stmt selectOutboundPeekStmt *sql.Stmt selectOutboundPeeksStmt *sql.Stmt diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index a2069f078..225082395 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -78,10 +78,10 @@ type FederationSenderOutboundPeeks interface { } type FederationSenderInboundPeeks interface { - InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) - RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) - SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error) - SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error) - DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error) - DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) + InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) + RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) + SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error) + SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error) + DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error) + DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) } diff --git a/roomserver/api/api.go b/roomserver/api/api.go index 5374dcdc8..ebfe4622c 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -187,4 +187,4 @@ type RoomserverInternalAPI interface { req *RemoveRoomAliasRequest, response *RemoveRoomAliasResponse, ) error -} \ No newline at end of file +} diff --git a/roomserver/api/output.go b/roomserver/api/output.go index bc1a55d04..a3f6d150f 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -230,13 +230,13 @@ type OutputNewPeek struct { // An OutputNewInboundPeek is written whenever a server starts peeking into a room type OutputNewInboundPeek struct { - RoomID string - PeekID string + RoomID string + PeekID string // the event ID at which the peek begins (so we can avoid // a race between tracking the state returned by /peek and emitting subsequent // peeked events) LatestEventID string - ServerName gomatrixserverlib.ServerName + ServerName gomatrixserverlib.ServerName // how often we told the peeking server to renew the peek RenewalInterval int64 } diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index f2556d7b3..07614cdf8 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -161,11 +161,11 @@ type PerformPublishResponse struct { } type PerformInboundPeekRequest struct { - UserID string `json:"user_id"` - RoomID string `json:"room_id"` - PeekID string `json:"peek_id"` - ServerName gomatrixserverlib.ServerName `json:"server_name"` - RenewalInterval int64 `json:"renewal_interval"` + UserID string `json:"user_id"` + RoomID string `json:"room_id"` + PeekID string `json:"peek_id"` + ServerName gomatrixserverlib.ServerName `json:"server_name"` + RenewalInterval int64 `json:"renewal_interval"` } type PerformInboundPeekResponse struct { @@ -180,4 +180,4 @@ type PerformInboundPeekResponse struct { AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"` // The event at which this state was captured LatestEvent gomatrixserverlib.HeaderedEvent `json:"latest_event"` -} \ No newline at end of file +} diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index c1414c09d..c104fc720 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -93,8 +93,8 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen Inputer: r.Inputer, } r.InboundPeeker = &perform.InboundPeeker{ - DB: r.DB, - Inputer: r.Inputer, + DB: r.DB, + Inputer: r.Inputer, } r.Leaver = &perform.Leaver{ Cfg: r.Cfg, @@ -142,4 +142,3 @@ func (r *RoomserverInternalAPI) PerformLeave( } return r.WriteOutputEvents(req.RoomID, outputEvents) } - diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go index 1cb0181ad..99342a90f 100644 --- a/roomserver/internal/perform/perform_inbound_peek.go +++ b/roomserver/internal/perform/perform_inbound_peek.go @@ -18,18 +18,18 @@ import ( "context" "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/roomserver/internal/helpers" - "github.com/matrix-org/dendrite/roomserver/internal/query" + "github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/internal/input" - "github.com/matrix-org/dendrite/roomserver/state" - "github.com/matrix-org/dendrite/roomserver/storage" - "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/dendrite/roomserver/internal/query" + "github.com/matrix-org/dendrite/roomserver/state" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) type InboundPeeker struct { - DB storage.Database + DB storage.Database Inputer *input.Inputer } @@ -42,86 +42,85 @@ type InboundPeeker struct { // fed sender can start sending peeked events without a race between the state // snapshot and the stream of peeked events. func (r *InboundPeeker) PerformInboundPeek( - ctx context.Context, - request *api.PerformInboundPeekRequest, - response *api.PerformInboundPeekResponse, + ctx context.Context, + request *api.PerformInboundPeekRequest, + response *api.PerformInboundPeekResponse, ) error { - info, err := r.DB.RoomInfo(ctx, request.RoomID) - if err != nil { - return err - } - if info == nil || info.IsStub { - return nil - } - response.RoomExists = true - response.RoomVersion = info.RoomVersion + info, err := r.DB.RoomInfo(ctx, request.RoomID) + if err != nil { + return err + } + if info == nil || info.IsStub { + return nil + } + response.RoomExists = true + response.RoomVersion = info.RoomVersion - var stateEvents []gomatrixserverlib.Event + var stateEvents []gomatrixserverlib.Event - var currentStateSnapshotNID types.StateSnapshotNID - latestEventRefs, currentStateSnapshotNID, _, err := - r.DB.LatestEventIDs(ctx, info.RoomNID) - if err != nil { - return err - } - // XXX: is this actually the latest of the latest events? - latestEvents, err := r.DB.EventsFromIDs(ctx, []string{ latestEventRefs[0].EventID }) - if err != nil { - return err - } - response.LatestEvent = latestEvents[0].Headered(info.RoomVersion) + var currentStateSnapshotNID types.StateSnapshotNID + latestEventRefs, currentStateSnapshotNID, _, err := + r.DB.LatestEventIDs(ctx, info.RoomNID) + if err != nil { + return err + } + // XXX: is this actually the latest of the latest events? + latestEvents, err := r.DB.EventsFromIDs(ctx, []string{latestEventRefs[0].EventID}) + if err != nil { + return err + } + response.LatestEvent = latestEvents[0].Headered(info.RoomVersion) - // XXX: do we actually need to do a state resolution here? - roomState := state.NewStateResolution(r.DB, *info) + // XXX: do we actually need to do a state resolution here? + roomState := state.NewStateResolution(r.DB, *info) - var stateEntries []types.StateEntry - stateEntries, err = roomState.LoadStateAtSnapshot( - ctx, currentStateSnapshotNID, - ) - if err != nil { - return err - } - stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries) - if err != nil { - return err - } + var stateEntries []types.StateEntry + stateEntries, err = roomState.LoadStateAtSnapshot( + ctx, currentStateSnapshotNID, + ) + if err != nil { + return err + } + stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries) + if err != nil { + return err + } - // get the auth event IDs for the current state events - var authEventIDs []string - for _, se := range stateEvents { - authEventIDs = append(authEventIDs, se.AuthEventIDs()...) - } - authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe + // get the auth event IDs for the current state events + var authEventIDs []string + for _, se := range stateEvents { + authEventIDs = append(authEventIDs, se.AuthEventIDs()...) + } + authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe - authEvents, err := query.GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs) - if err != nil { - return err - } + authEvents, err := query.GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs) + if err != nil { + return err + } - for _, event := range stateEvents { - response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion)) - } + for _, event := range stateEvents { + response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion)) + } - for _, event := range authEvents { - response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion)) - } + for _, event := range authEvents { + response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion)) + } - // FIXME: there's a race here - we really should be atomically telling the - // federationsender to start sending peek events alongside having captured - // the current state, but it's unclear if/how we can do that. + // FIXME: there's a race here - we really should be atomically telling the + // federationsender to start sending peek events alongside having captured + // the current state, but it's unclear if/how we can do that. err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{ { Type: api.OutputTypeNewInboundPeek, NewInboundPeek: &api.OutputNewInboundPeek{ - RoomID: request.RoomID, - PeekID: request.PeekID, - LatestEventID: latestEvents[0].EventID(), - ServerName: request.ServerName, - RenewalInterval: request.RenewalInterval, + RoomID: request.RoomID, + PeekID: request.PeekID, + LatestEventID: latestEvents[0].EventID(), + ServerName: request.ServerName, + RenewalInterval: request.RenewalInterval, }, }, }) - return err + return err } - diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index e3ce44a9a..193050d40 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -25,13 +25,13 @@ const ( RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents" // Perform operations - RoomserverPerformInvitePath = "/roomserver/performInvite" - RoomserverPerformPeekPath = "/roomserver/performPeek" - RoomserverPerformJoinPath = "/roomserver/performJoin" - RoomserverPerformLeavePath = "/roomserver/performLeave" - RoomserverPerformBackfillPath = "/roomserver/performBackfill" - RoomserverPerformPublishPath = "/roomserver/performPublish" - RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek" + RoomserverPerformInvitePath = "/roomserver/performInvite" + RoomserverPerformPeekPath = "/roomserver/performPeek" + RoomserverPerformJoinPath = "/roomserver/performJoin" + RoomserverPerformLeavePath = "/roomserver/performLeave" + RoomserverPerformBackfillPath = "/roomserver/performBackfill" + RoomserverPerformPublishPath = "/roomserver/performPublish" + RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek" // Query operations RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"