Use HeaderedEvent in syncapi

This commit is contained in:
Neil Alexander 2020-03-18 15:52:46 +00:00
parent ec38783192
commit 6ecff990f0
23 changed files with 210 additions and 147 deletions

4
go.mod
View file

@ -11,12 +11,10 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200310180544-7f3fad43b51c github.com/matrix-org/go-http-js-libp2p v0.0.0-20200310180544-7f3fad43b51c
github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200317140257-ddc7feaaf2fd github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/mattn/go-sqlite3 v2.0.2+incompatible
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5
github.com/opentracing/opentracing-go v1.1.0 github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.8.1 github.com/pkg/errors v0.8.1

6
go.sum
View file

@ -268,6 +268,12 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200317114945-9a368ea4620d h1:0G
github.com/matrix-org/gomatrixserverlib v0.0.0-20200317114945-9a368ea4620d/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200317114945-9a368ea4620d/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200317140257-ddc7feaaf2fd h1:n95A8YyiCZ8Nu2beqw4akCaPIRrZr/nesHYDZV8WkXI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200317140257-ddc7feaaf2fd h1:n95A8YyiCZ8Nu2beqw4akCaPIRrZr/nesHYDZV8WkXI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200317140257-ddc7feaaf2fd/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200317140257-ddc7feaaf2fd/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318141718-859b9f256ffd h1:381JBgdNYOQwWelV/LOPqDoUkdSBUCkVZY8Of+n5jAM=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318141718-859b9f256ffd/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a h1:7+e7ManmyiGNqo06sQIRIoUtoV92XWzSbc0o3e4aTGY=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203 h1:7HkL6bF7/M2cYteNFVtvGW5qjD4wHIiR0HsdCm2Rqao=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8=

View file

@ -43,6 +43,8 @@ type QueryLatestEventsAndStateResponse struct {
// Does the room exist? // Does the room exist?
// If the room doesn't exist this will be false and LatestEvents will be empty. // If the room doesn't exist this will be false and LatestEvents will be empty.
RoomExists bool `json:"room_exists"` RoomExists bool `json:"room_exists"`
// The room version of the room.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// The latest events in the room. // The latest events in the room.
// These are used to set the prev_events when sending an event. // These are used to set the prev_events when sending an event.
LatestEvents []gomatrixserverlib.EventReference `json:"latest_events"` LatestEvents []gomatrixserverlib.EventReference `json:"latest_events"`
@ -74,6 +76,8 @@ type QueryStateAfterEventsResponse struct {
// Does the room exist on this roomserver? // Does the room exist on this roomserver?
// If the room doesn't exist this will be false and StateEvents will be empty. // If the room doesn't exist this will be false and StateEvents will be empty.
RoomExists bool `json:"room_exists"` RoomExists bool `json:"room_exists"`
// The room version of the room.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// Do all the previous events exist on this roomserver? // Do all the previous events exist on this roomserver?
// If some of previous events do not exist this will be false and StateEvents will be empty. // If some of previous events do not exist this will be false and StateEvents will be empty.
PrevEventsExist bool `json:"prev_events_exist"` PrevEventsExist bool `json:"prev_events_exist"`
@ -207,6 +211,8 @@ type QueryStateAndAuthChainResponse struct {
// Does the room exist on this roomserver? // Does the room exist on this roomserver?
// If the room doesn't exist this will be false and StateEvents will be empty. // If the room doesn't exist this will be false and StateEvents will be empty.
RoomExists bool `json:"room_exists"` RoomExists bool `json:"room_exists"`
// The room version of the room.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// Do all the previous events exist on this roomserver? // Do all the previous events exist on this roomserver?
// If some of previous events do not exist this will be false and StateEvents will be empty. // If some of previous events do not exist this will be false and StateEvents will be empty.
PrevEventsExist bool `json:"prev_events_exist"` PrevEventsExist bool `json:"prev_events_exist"`
@ -251,8 +257,18 @@ type QueryRoomVersionCapabilitiesRequest struct{}
// QueryRoomVersionCapabilitiesResponse is a response to QueryServersInRoomAtEventResponse // QueryRoomVersionCapabilitiesResponse is a response to QueryServersInRoomAtEventResponse
type QueryRoomVersionCapabilitiesResponse struct { type QueryRoomVersionCapabilitiesResponse struct {
DefaultRoomVersion string `json:"default"` DefaultRoomVersion gomatrixserverlib.RoomVersion `json:"default"`
AvailableRoomVersions map[string]string `json:"available"` AvailableRoomVersions map[gomatrixserverlib.RoomVersion]string `json:"available"`
}
// QueryRoomVersionForRoom asks for the room version for a given room.
type QueryRoomVersionForRoomRequest struct {
RoomID string `json:"room_id"`
}
// QueryRoomVersionCapabilitiesResponse is a response to QueryServersInRoomAtEventResponse
type QueryRoomVersionForRoomResponse struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
} }
// RoomserverQueryAPI is used to query information from the room server. // RoomserverQueryAPI is used to query information from the room server.
@ -341,6 +357,15 @@ type RoomserverQueryAPI interface {
request *QueryRoomVersionCapabilitiesRequest, request *QueryRoomVersionCapabilitiesRequest,
response *QueryRoomVersionCapabilitiesResponse, response *QueryRoomVersionCapabilitiesResponse,
) error ) error
// Asks for the room version for a given room.
/*
QueryRoomVersionForRoom(
ctx context.Context,
request *QueryRoomVersionForRoomRequest,
response *QueryRoomVersionForRoomResponse,
) error
*/
} }
// RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API. // RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API.
@ -379,6 +404,9 @@ const RoomserverQueryServersInRoomAtEventPath = "/api/roomserver/queryServersInR
// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API // RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API
const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVersionCapabilities" const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVersionCapabilities"
// RoomserverQueryRoomVersionCapabilitiesPath is the HTTP path for the QueryRoomVersionCapabilities API
const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom"
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
// If httpClient is nil then it uses the http.DefaultClient // If httpClient is nil then it uses the http.DefaultClient
func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI { func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI {
@ -536,7 +564,7 @@ func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent(
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// QueryServersInRoomAtEvent implements RoomServerQueryAPI // QueryRoomVersionCapabilities implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities( func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities(
ctx context.Context, ctx context.Context,
request *QueryRoomVersionCapabilitiesRequest, request *QueryRoomVersionCapabilitiesRequest,
@ -548,3 +576,16 @@ func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities(
apiURL := h.roomserverURL + RoomserverQueryRoomVersionCapabilitiesPath apiURL := h.roomserverURL + RoomserverQueryRoomVersionCapabilitiesPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// QueryRoomVersionForRoom implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryRoomVersionForRoom(
ctx context.Context,
request *QueryRoomVersionForRoomRequest,
response *QueryRoomVersionForRoomResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryRoomVersionForRoom")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryRoomVersionForRoomPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -808,14 +808,13 @@ func (r *RoomserverQueryAPI) QueryRoomVersionCapabilities(
request *api.QueryRoomVersionCapabilitiesRequest, request *api.QueryRoomVersionCapabilitiesRequest,
response *api.QueryRoomVersionCapabilitiesResponse, response *api.QueryRoomVersionCapabilitiesResponse,
) error { ) error {
response.DefaultRoomVersion = string(version.DefaultRoomVersion()) response.DefaultRoomVersion = version.DefaultRoomVersion()
response.AvailableRoomVersions = make(map[string]string) response.AvailableRoomVersions = make(map[gomatrixserverlib.RoomVersion]string)
for v, desc := range version.SupportedRoomVersions() { for v, desc := range version.SupportedRoomVersions() {
sv := string(v)
if desc.Stable { if desc.Stable {
response.AvailableRoomVersions[sv] = "stable" response.AvailableRoomVersions[v] = "stable"
} else { } else {
response.AvailableRoomVersions[sv] = "unstable" response.AvailableRoomVersions[v] = "unstable"
} }
} }
return nil return nil

View file

@ -155,7 +155,7 @@ func extractRoomVersionFromCreateEvent(event gomatrixserverlib.Event) (
} }
// A room version was specified in the event content? // A room version was specified in the event content?
if createContent.RoomVersion != nil { if createContent.RoomVersion != nil {
roomVersion = *createContent.RoomVersion roomVersion = gomatrixserverlib.RoomVersion(*createContent.RoomVersion)
} }
return roomVersion, err return roomVersion, err
} }

View file

@ -184,7 +184,7 @@ func extractRoomVersionFromCreateEvent(event gomatrixserverlib.Event) (
} }
// A room version was specified in the event content? // A room version was specified in the event content?
if createContent.RoomVersion != nil { if createContent.RoomVersion != nil {
roomVersion = *createContent.RoomVersion roomVersion = gomatrixserverlib.RoomVersion(*createContent.RoomVersion)
} }
return roomVersion, err return roomVersion, err
} }

View file

@ -98,7 +98,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputRoomEventConsumer) onNewRoomEvent( func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent, ctx context.Context, msg api.OutputNewRoomEvent,
) error { ) error {
ev := msg.Event.Event ev := msg.Event
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
"room_id": ev.RoomID(), "room_id": ev.RoomID(),
@ -153,7 +153,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
func (s *OutputRoomEventConsumer) onNewInviteEvent( func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent, ctx context.Context, msg api.OutputNewInviteEvent,
) error { ) error {
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event.Event) pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil { if err != nil {
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -163,7 +163,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure") }).Panicf("roomserver output log: write invite failure")
return nil return nil
} }
s.notifier.OnNewEvent(&msg.Event.Event, "", nil, types.PaginationToken{PDUPosition: pduPos}) s.notifier.OnNewEvent(&msg.Event, "", nil, types.PaginationToken{PDUPosition: pduPos})
return nil return nil
} }
@ -186,8 +186,8 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
// lookupStateEvents looks up the state events that are added by a new event. // lookupStateEvents looks up the state events that are added by a new event.
func (s *OutputRoomEventConsumer) lookupStateEvents( func (s *OutputRoomEventConsumer) lookupStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.Event, addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
) ([]gomatrixserverlib.Event, error) { ) ([]gomatrixserverlib.HeaderedEvent, error) {
// Fast path if there aren't any new state events. // Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 { if len(addsStateEventIDs) == 0 {
return nil, nil return nil, nil
@ -195,7 +195,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// Fast path if the only state event added is the event itself. // Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
return []gomatrixserverlib.Event{event}, nil return []gomatrixserverlib.HeaderedEvent{event}, nil
} }
// Check if this is re-adding a state events that we previously processed // Check if this is re-adding a state events that we previously processed
@ -229,10 +229,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
return nil, err return nil, err
} }
for _, headeredEvent := range eventResp.Events { result = append(result, eventResp.Events...)
result = append(result, headeredEvent.Event)
}
missing = missingEventsFrom(result, addsStateEventIDs) missing = missingEventsFrom(result, addsStateEventIDs)
if len(missing) != 0 { if len(missing) != 0 {
@ -244,7 +241,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
return result, nil return result, nil
} }
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) { func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
var stateKey string var stateKey string
if event.StateKey() == nil { if event.StateKey() == nil {
stateKey = "" stateKey = ""
@ -269,10 +266,11 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Event
PrevSender: prevEvent.Sender(), PrevSender: prevEvent.Sender(),
} }
return event.SetUnsigned(prev) event.Event, err = event.SetUnsigned(prev)
return event, err
} }
func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string { func missingEventsFrom(events []gomatrixserverlib.HeaderedEvent, required []string) []string {
have := map[string]bool{} have := map[string]bool{}
for _, event := range events { for _, event := range events {
have[event.EventID()] = true have[event.EventID()] = true

View file

@ -181,7 +181,7 @@ func (r *messagesReq) retrieveEvents() (
return return
} }
var events []gomatrixserverlib.Event var events []gomatrixserverlib.HeaderedEvent
// There can be two reasons for streamEvents to be empty: either we've // There can be two reasons for streamEvents to be empty: either we've
// reached the oldest event in the room (or the most recent one, depending // reached the oldest event in the room (or the most recent one, depending
@ -217,7 +217,7 @@ func (r *messagesReq) retrieveEvents() (
} }
// Convert all of the events into client events. // Convert all of the events into client events.
clientEvents = gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll) clientEvents = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll)
// Get the position of the first and the last event in the room's topology. // Get the position of the first and the last event in the room's topology.
// This position is currently determined by the event's depth, so we could // This position is currently determined by the event's depth, so we could
// also use it instead of retrieving from the database. However, if we ever // also use it instead of retrieving from the database. However, if we ever
@ -273,7 +273,7 @@ func (r *messagesReq) retrieveEvents() (
// Returns an error if there was an issue talking with the database or // Returns an error if there was an issue talking with the database or
// backfilling. // backfilling.
func (r *messagesReq) handleEmptyEventsSlice() ( func (r *messagesReq) handleEmptyEventsSlice() (
events []gomatrixserverlib.Event, err error, events []gomatrixserverlib.HeaderedEvent, err error,
) { ) {
backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID) backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID)
@ -287,7 +287,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
} else { } else {
// If not, it means the slice was empty because we reached the room's // If not, it means the slice was empty because we reached the room's
// creation, so return an empty slice. // creation, so return an empty slice.
events = []gomatrixserverlib.Event{} events = []gomatrixserverlib.HeaderedEvent{}
} }
return return
@ -299,7 +299,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
// through backfilling if needed. // through backfilling if needed.
// Returns an error if there was an issue while backfilling. // Returns an error if there was an issue while backfilling.
func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent) ( func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent) (
events []gomatrixserverlib.Event, err error, events []gomatrixserverlib.HeaderedEvent, err error,
) { ) {
// Check if we have enough events. // Check if we have enough events.
isSetLargeEnough := true isSetLargeEnough := true
@ -326,7 +326,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// Backfill is needed if we've reached a backward extremity and need more // Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward. // events. It's only needed if the direction is backward.
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering { if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
var pdus []gomatrixserverlib.Event var pdus []gomatrixserverlib.HeaderedEvent
// Only ask the remote server for enough events to reach the limit. // Only ask the remote server for enough events to reach the limit.
pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents)) pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents))
if err != nil { if err != nil {
@ -400,7 +400,7 @@ func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (boo
// event, or if there is no remote homeserver to contact. // event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in // Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request. // the room or sending the request.
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.Event, error) { func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
// Query the list of servers in the room when one of the backward extremities // Query the list of servers in the room when one of the backward extremities
// was sent. // was sent.
var serversResponse api.QueryServersInRoomAtEventResponse var serversResponse api.QueryServersInRoomAtEventResponse
@ -428,7 +428,7 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
} }
} }
pdus := make([]gomatrixserverlib.Event, 0) pdus := make([]gomatrixserverlib.HeaderedEvent, 0)
// If the roomserver responded with at least one server that isn't us, // If the roomserver responded with at least one server that isn't us,
// send it a request for backfill. // send it a request for backfill.
@ -440,13 +440,20 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
return nil, err return nil, err
} }
pdus = txn.PDUs for _, p := range txn.PDUs {
pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
}
// Store the events in the database, while marking them as unfit to show // Store the events in the database, while marking them as unfit to show
// up in responses to sync requests. // up in responses to sync requests.
for _, pdu := range pdus { for _, pdu := range pdus {
headered := pdu.Headered(gomatrixserverlib.RoomVersionV1)
if _, err = r.db.WriteEvent( if _, err = r.db.WriteEvent(
r.ctx, &pdu, []gomatrixserverlib.Event{}, []string{}, []string{}, r.ctx,
&headered,
[]gomatrixserverlib.HeaderedEvent{},
[]string{},
[]string{},
nil, true, nil, true,
); err != nil { ); err != nil {
return nil, err return nil, err
@ -486,7 +493,7 @@ func setToDefault(
// timestamp of two Matrix events. // timestamp of two Matrix events.
// Returns true if the first event happened before the second one, false // Returns true if the first event happened before the second one, false
// otherwise. // otherwise.
func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool { func sortEvents(e1 *gomatrixserverlib.HeaderedEvent, e2 *gomatrixserverlib.HeaderedEvent) bool {
t := e1.OriginServerTS().Time() t := e1.OriginServerTS().Time()
return e2.OriginServerTS().Time().After(t) return e2.OriginServerTS().Time().After(t)
} }

View file

@ -56,7 +56,9 @@ func OnIncomingStateRequest(req *http.Request, db storage.Database, roomID strin
// Fill the prev_content and replaces_state keys if necessary // Fill the prev_content and replaces_state keys if necessary
for _, event := range stateEvents { for _, event := range stateEvents {
stateEvent := stateEventInStateResp{ stateEvent := stateEventInStateResp{
ClientEvent: gomatrixserverlib.ToClientEvent(event, gomatrixserverlib.FormatAll), ClientEvent: gomatrixserverlib.HeaderedToClientEvents(
[]gomatrixserverlib.HeaderedEvent{event}, gomatrixserverlib.FormatAll,
)[0],
} }
var prevEventRef types.PrevEventRef var prevEventRef types.PrevEventRef
if len(event.Unsigned()) > 0 { if len(event.Unsigned()) > 0 {
@ -113,7 +115,7 @@ func OnIncomingStateTypeRequest(req *http.Request, db storage.Database, roomID s
} }
stateEvent := stateEventInStateResp{ stateEvent := stateEventInStateResp{
ClientEvent: gomatrixserverlib.ToClientEvent(*event, gomatrixserverlib.FormatAll), ClientEvent: gomatrixserverlib.HeaderedToClientEvent(*event, gomatrixserverlib.FormatAll),
} }
return util.JSONResponse{ return util.JSONResponse{

View file

@ -29,16 +29,16 @@ import (
type Database interface { type Database interface {
common.PartitionStorer common.PartitionStorer
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error)
WriteEvent(context.Context, *gomatrixserverlib.Event, []gomatrixserverlib.Event, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error) WriteEvent(context.Context, *gomatrixserverlib.HeaderedEvent, []gomatrixserverlib.HeaderedEvent, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error)
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error) GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.Event, err error) GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.HeaderedEvent, err error)
SyncPosition(ctx context.Context) (types.PaginationToken, error) SyncPosition(ctx context.Context) (types.PaginationToken, error)
IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error)
CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error) CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error)
GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error)
UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error) UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error)
AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (types.StreamPosition, error) AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error)
RetireInviteEvent(ctx context.Context, inviteEventID string) error RetireInviteEvent(ctx context.Context, inviteEventID string) error
SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn)
AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition
@ -48,6 +48,6 @@ type Database interface {
EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error) EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error)
BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error) BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error)
MaxTopologicalPosition(ctx context.Context, roomID string) (types.StreamPosition, error) MaxTopologicalPosition(ctx context.Context, roomID string) (types.StreamPosition, error)
StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent
SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error)
} }

View file

@ -185,7 +185,7 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
func (s *currentRoomStateStatements) selectCurrentState( func (s *currentRoomStateStatements) selectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *gomatrixserverlib.StateFilter, stateFilter *gomatrixserverlib.StateFilter,
) ([]gomatrixserverlib.Event, error) { ) ([]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectCurrentStateStmt) stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
rows, err := stmt.QueryContext(ctx, roomID, rows, err := stmt.QueryContext(ctx, roomID,
pq.StringArray(stateFilter.Senders), pq.StringArray(stateFilter.Senders),
@ -213,7 +213,7 @@ func (s *currentRoomStateStatements) deleteRoomStateByEventID(
func (s *currentRoomStateStatements) upsertRoomState( func (s *currentRoomStateStatements) upsertRoomState(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.Event, membership *string, addedAt types.StreamPosition, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error { ) error {
// Parse content as JSON and search for an "url" key // Parse content as JSON and search for an "url" key
containsURL := false containsURL := false
@ -252,16 +252,16 @@ func (s *currentRoomStateStatements) selectEventsWithEventIDs(
return rowsToStreamEvents(rows) return rowsToStreamEvents(rows)
} }
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) {
result := []gomatrixserverlib.Event{} result := []gomatrixserverlib.HeaderedEvent{}
for rows.Next() { for rows.Next() {
var eventBytes []byte var eventBytes []byte
if err := rows.Scan(&eventBytes); err != nil { if err := rows.Scan(&eventBytes); err != nil {
return nil, err return nil, err
} }
// TODO: Handle redacted events // TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) var ev gomatrixserverlib.HeaderedEvent
if err != nil { if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err return nil, err
} }
result = append(result, ev) result = append(result, ev)
@ -271,7 +271,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
func (s *currentRoomStateStatements) selectStateEvent( func (s *currentRoomStateStatements) selectStateEvent(
ctx context.Context, roomID, evType, stateKey string, ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.Event, error) { ) (*gomatrixserverlib.HeaderedEvent, error) {
stmt := s.selectStateEventStmt stmt := s.selectStateEventStmt
var res []byte var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res) err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
@ -281,6 +281,9 @@ func (s *currentRoomStateStatements) selectStateEvent(
if err != nil { if err != nil {
return nil, err return nil, err
} }
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false) var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(res, &ev); err != nil {
return nil, err
}
return &ev, err return &ev, err
} }

View file

@ -18,6 +18,7 @@ package postgres
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
@ -86,7 +87,7 @@ func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) {
} }
func (s *inviteEventsStatements) insertInviteEvent( func (s *inviteEventsStatements) insertInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event, ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
) (streamPos types.StreamPosition, err error) { ) (streamPos types.StreamPosition, err error) {
err = s.insertInviteEventStmt.QueryRowContext( err = s.insertInviteEventStmt.QueryRowContext(
ctx, ctx,
@ -109,14 +110,14 @@ func (s *inviteEventsStatements) deleteInviteEvent(
// active invites for the target user ID in the supplied range. // active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) selectInviteEventsInRange( func (s *inviteEventsStatements) selectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition, ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition,
) (map[string]gomatrixserverlib.Event, error) { ) (map[string]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt) stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos) rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer common.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed") defer common.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
result := map[string]gomatrixserverlib.Event{} result := map[string]gomatrixserverlib.HeaderedEvent{}
for rows.Next() { for rows.Next() {
var ( var (
roomID string roomID string
@ -126,8 +127,8 @@ func (s *inviteEventsStatements) selectInviteEventsInRange(
return nil, err return nil, err
} }
event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false) var event gomatrixserverlib.HeaderedEvent
if err != nil { if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, err return nil, err
} }

View file

@ -203,8 +203,8 @@ func (s *outputRoomEventsStatements) selectStateInRange(
} }
// TODO: Handle redacted events // TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) var ev gomatrixserverlib.HeaderedEvent
if err != nil { if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, nil, err return nil, nil, err
} }
needSet := stateNeeded[ev.RoomID()] needSet := stateNeeded[ev.RoomID()]
@ -220,7 +220,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
stateNeeded[ev.RoomID()] = needSet stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{ eventIDToEvent[ev.EventID()] = types.StreamEvent{
Event: ev, HeaderedEvent: ev,
StreamPosition: streamPos, StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync, ExcludeFromSync: excludeFromSync,
} }
@ -248,7 +248,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID(
// of the inserted event. // of the inserted event.
func (s *outputRoomEventsStatements) insertEvent( func (s *outputRoomEventsStatements) insertEvent(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
event *gomatrixserverlib.Event, addState, removeState []string, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool, transactionID *api.TransactionID, excludeFromSync bool,
) (streamPos types.StreamPosition, err error) { ) (streamPos types.StreamPosition, err error) {
var txnID *string var txnID *string
@ -373,8 +373,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return nil, err return nil, err
} }
// TODO: Handle redacted events // TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) var ev gomatrixserverlib.HeaderedEvent
if err != nil { if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err return nil, err
} }
@ -386,7 +386,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
} }
result = append(result, types.StreamEvent{ result = append(result, types.StreamEvent{
Event: ev, HeaderedEvent: ev,
StreamPosition: streamPos, StreamPosition: streamPos,
TransactionID: transactionID, TransactionID: transactionID,
ExcludeFromSync: excludeFromSync, ExcludeFromSync: excludeFromSync,

View file

@ -104,7 +104,7 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
// insertEventInTopology inserts the given event in the room's topology, based // insertEventInTopology inserts the given event in the room's topology, based
// on the event's depth. // on the event's depth.
func (s *outputRoomEventsTopologyStatements) insertEventInTopology( func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
ctx context.Context, event *gomatrixserverlib.Event, ctx context.Context, event *gomatrixserverlib.HeaderedEvent,
) (err error) { ) (err error) {
_, err = s.insertEventInTopologyStmt.ExecContext( _, err = s.insertEventInTopologyStmt.ExecContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), ctx, event.EventID(), event.Depth(), event.RoomID(),

View file

@ -37,7 +37,7 @@ import (
type stateDelta struct { type stateDelta struct {
roomID string roomID string
stateEvents []gomatrixserverlib.Event stateEvents []gomatrixserverlib.HeaderedEvent
membership string membership string
// The PDU stream position of the latest membership event for this user, if applicable. // The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta. // Can be 0 if there is no membership event in this delta.
@ -100,7 +100,7 @@ func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[s
// If an event is not found in the database then it will be omitted from the list. // If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database. // Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events. // Does not include any transaction IDs in the returned events.
func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) { func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) {
streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -111,7 +111,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
return d.StreamEventsToEvents(nil, streamEvents), nil return d.StreamEventsToEvents(nil, streamEvents), nil
} }
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.Event) error { func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error {
// If the event is already known as a backward extremity, don't consider // If the event is already known as a backward extremity, don't consider
// it as such anymore now that we have it. // it as such anymore now that we have it.
isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID()) isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID())
@ -155,8 +155,8 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev
// Returns an error if there was a problem inserting this event. // Returns an error if there was a problem inserting this event.
func (d *SyncServerDatasource) WriteEvent( func (d *SyncServerDatasource) WriteEvent(
ctx context.Context, ctx context.Context,
ev *gomatrixserverlib.Event, ev *gomatrixserverlib.HeaderedEvent,
addStateEvents []gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.HeaderedEvent,
addStateEventIDs, removeStateEventIDs []string, addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID, excludeFromSync bool, transactionID *api.TransactionID, excludeFromSync bool,
) (pduPosition types.StreamPosition, returnErr error) { ) (pduPosition types.StreamPosition, returnErr error) {
@ -192,7 +192,7 @@ func (d *SyncServerDatasource) WriteEvent(
func (d *SyncServerDatasource) updateRoomState( func (d *SyncServerDatasource) updateRoomState(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
removedEventIDs []string, removedEventIDs []string,
addedEvents []gomatrixserverlib.Event, addedEvents []gomatrixserverlib.HeaderedEvent,
pduPosition types.StreamPosition, pduPosition types.StreamPosition,
) error { ) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
@ -228,7 +228,7 @@ func (d *SyncServerDatasource) updateRoomState(
// If there was an issue during the retrieval, returns an error // If there was an issue during the retrieval, returns an error
func (d *SyncServerDatasource) GetStateEvent( func (d *SyncServerDatasource) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string, ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.Event, error) { ) (*gomatrixserverlib.HeaderedEvent, error) {
return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey) return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
} }
@ -237,7 +237,7 @@ func (d *SyncServerDatasource) GetStateEvent(
// Returns an error if there was an issue with the retrieval. // Returns an error if there was an issue with the retrieval.
func (d *SyncServerDatasource) GetStateEventsForRoom( func (d *SyncServerDatasource) GetStateEventsForRoom(
ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter, ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter,
) (stateEvents []gomatrixserverlib.Event, err error) { ) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error { err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter) stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter)
return err return err
@ -599,7 +599,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// Build up a /sync response. Add joined rooms. // Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs { for _, roomID := range joinedRoomIDs {
var stateEvents []gomatrixserverlib.Event var stateEvents []gomatrixserverlib.HeaderedEvent
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilter) stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilter)
if err != nil { if err != nil {
return return
@ -633,9 +633,9 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0, types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
).String() ).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr res.Rooms.Join[roomID] = *jr
} }
@ -707,7 +707,7 @@ func (d *SyncServerDatasource) UpsertAccountData(
// If the invite was successfully stored this returns the stream ID it was stored at. // If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database. // Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatasource) AddInviteEvent( func (d *SyncServerDatasource) AddInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event, ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
) (types.StreamPosition, error) { ) (types.StreamPosition, error) {
return d.invites.insertInviteEvent(ctx, inviteEvent) return d.invites.insertInviteEvent(ctx, inviteEvent)
} }
@ -758,7 +758,7 @@ func (d *SyncServerDatasource) addInvitesToResponse(
for roomID, inviteEvent := range invites { for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse() ir := types.NewInviteResponse()
ir.InviteState.Events = gomatrixserverlib.ToClientEvents( ir.InviteState.Events = gomatrixserverlib.ToClientEvents(
[]gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync, []gomatrixserverlib.Event{inviteEvent.Event}, gomatrixserverlib.FormatSync,
) )
// TODO: add the invite state from the invite event. // TODO: add the invite state from the invite event.
res.Rooms.Invite[roomID] = *ir res.Rooms.Invite[roomID] = *ir
@ -821,9 +821,9 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0, types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
).String() ).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr res.Rooms.Join[delta.roomID] = *jr
case gomatrixserverlib.Leave: case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban fallthrough // transitions to leave are the same as ban
@ -834,9 +834,9 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0, types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
).String() ).String()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr res.Rooms.Leave[delta.roomID] = *lr
} }
@ -1074,7 +1074,7 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
} }
s := make([]types.StreamEvent, len(allState)) s := make([]types.StreamEvent, len(allState))
for i := 0; i < len(s); i++ { for i := 0; i < len(s); i++ {
s[i] = types.StreamEvent{Event: allState[i], StreamPosition: 0} s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0}
} }
return s, nil return s, nil
} }
@ -1082,10 +1082,10 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and // StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets // matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event. // added to the unsigned section of the output event.
func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event { func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent {
out := make([]gomatrixserverlib.Event, len(in)) out := make([]gomatrixserverlib.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ { for i := 0; i < len(in); i++ {
out[i] = in[i].Event out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil { if device != nil && in[i].TransactionID != nil {
if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID { if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID {
err := out[i].SetUnsignedField( err := out[i].SetUnsignedField(
@ -1105,7 +1105,7 @@ func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in
// There may be some overlap where events in stateEvents are already in recentEvents, so filter // There may be some overlap where events in stateEvents are already in recentEvents, so filter
// them out so we don't include them twice in the /sync response. They should be in recentEvents // them out so we don't include them twice in the /sync response. They should be in recentEvents
// only, so clients get to the correct state once they have rolled forward. // only, so clients get to the correct state once they have rolled forward.
func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event { func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
for _, recentEv := range recentEvents { for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil { if recentEv.StateKey() == nil {
continue // not a state event continue // not a state event

View file

@ -171,7 +171,7 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
func (s *currentRoomStateStatements) selectCurrentState( func (s *currentRoomStateStatements) selectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
stateFilterPart *gomatrixserverlib.StateFilter, stateFilterPart *gomatrixserverlib.StateFilter,
) ([]gomatrixserverlib.Event, error) { ) ([]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectCurrentStateStmt) stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
rows, err := stmt.QueryContext(ctx, roomID, rows, err := stmt.QueryContext(ctx, roomID,
nil, // FIXME: pq.StringArray(stateFilterPart.Senders), nil, // FIXME: pq.StringArray(stateFilterPart.Senders),
@ -199,7 +199,7 @@ func (s *currentRoomStateStatements) deleteRoomStateByEventID(
func (s *currentRoomStateStatements) upsertRoomState( func (s *currentRoomStateStatements) upsertRoomState(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.Event, membership *string, addedAt types.StreamPosition, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error { ) error {
// Parse content as JSON and search for an "url" key // Parse content as JSON and search for an "url" key
containsURL := false containsURL := false
@ -242,16 +242,16 @@ func (s *currentRoomStateStatements) selectEventsWithEventIDs(
return rowsToStreamEvents(rows) return rowsToStreamEvents(rows)
} }
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) {
result := []gomatrixserverlib.Event{} result := []gomatrixserverlib.HeaderedEvent{}
for rows.Next() { for rows.Next() {
var eventBytes []byte var eventBytes []byte
if err := rows.Scan(&eventBytes); err != nil { if err := rows.Scan(&eventBytes); err != nil {
return nil, err return nil, err
} }
// TODO: Handle redacted events // TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) var ev gomatrixserverlib.HeaderedEvent
if err != nil { if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err return nil, err
} }
result = append(result, ev) result = append(result, ev)
@ -261,7 +261,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
func (s *currentRoomStateStatements) selectStateEvent( func (s *currentRoomStateStatements) selectStateEvent(
ctx context.Context, roomID, evType, stateKey string, ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.Event, error) { ) (*gomatrixserverlib.HeaderedEvent, error) {
stmt := s.selectStateEventStmt stmt := s.selectStateEventStmt
var res []byte var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res) err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
@ -271,6 +271,9 @@ func (s *currentRoomStateStatements) selectStateEvent(
if err != nil { if err != nil {
return nil, err return nil, err
} }
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false) var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(res, &ev); err != nil {
return nil, err
}
return &ev, err return &ev, err
} }

View file

@ -18,6 +18,7 @@ package sqlite3
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
@ -83,7 +84,7 @@ func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatement
} }
func (s *inviteEventsStatements) insertInviteEvent( func (s *inviteEventsStatements) insertInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.Event, streamPos types.StreamPosition, ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent, streamPos types.StreamPosition,
) (err error) { ) (err error) {
_, err = txn.Stmt(s.insertInviteEventStmt).ExecContext( _, err = txn.Stmt(s.insertInviteEventStmt).ExecContext(
ctx, ctx,
@ -107,14 +108,14 @@ func (s *inviteEventsStatements) deleteInviteEvent(
// active invites for the target user ID in the supplied range. // active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) selectInviteEventsInRange( func (s *inviteEventsStatements) selectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition, ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition,
) (map[string]gomatrixserverlib.Event, error) { ) (map[string]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt) stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos) rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer common.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed") defer common.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
result := map[string]gomatrixserverlib.Event{} result := map[string]gomatrixserverlib.HeaderedEvent{}
for rows.Next() { for rows.Next() {
var ( var (
roomID string roomID string
@ -124,8 +125,8 @@ func (s *inviteEventsStatements) selectInviteEventsInRange(
return nil, err return nil, err
} }
event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false) var event gomatrixserverlib.HeaderedEvent
if err != nil { if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, err return nil, err
} }

View file

@ -200,8 +200,8 @@ func (s *outputRoomEventsStatements) selectStateInRange(
} }
// TODO: Handle redacted events // TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) var ev gomatrixserverlib.HeaderedEvent
if err != nil { if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, nil, err return nil, nil, err
} }
needSet := stateNeeded[ev.RoomID()] needSet := stateNeeded[ev.RoomID()]
@ -217,7 +217,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
stateNeeded[ev.RoomID()] = needSet stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{ eventIDToEvent[ev.EventID()] = types.StreamEvent{
Event: ev, HeaderedEvent: ev,
StreamPosition: streamPos, StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync, ExcludeFromSync: excludeFromSync,
} }
@ -245,7 +245,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID(
// of the inserted event. // of the inserted event.
func (s *outputRoomEventsStatements) insertEvent( func (s *outputRoomEventsStatements) insertEvent(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
event *gomatrixserverlib.Event, addState, removeState []string, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool, transactionID *api.TransactionID, excludeFromSync bool,
) (streamPos types.StreamPosition, err error) { ) (streamPos types.StreamPosition, err error) {
var txnID *string var txnID *string
@ -392,8 +392,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return nil, err return nil, err
} }
// TODO: Handle redacted events // TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) var ev gomatrixserverlib.HeaderedEvent
if err != nil { if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err return nil, err
} }
@ -405,7 +405,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
} }
result = append(result, types.StreamEvent{ result = append(result, types.StreamEvent{
Event: ev, HeaderedEvent: ev,
StreamPosition: streamPos, StreamPosition: streamPos,
TransactionID: transactionID, TransactionID: transactionID,
ExcludeFromSync: excludeFromSync, ExcludeFromSync: excludeFromSync,

View file

@ -101,7 +101,7 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
// insertEventInTopology inserts the given event in the room's topology, based // insertEventInTopology inserts the given event in the room's topology, based
// on the event's depth. // on the event's depth.
func (s *outputRoomEventsTopologyStatements) insertEventInTopology( func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
) (err error) { ) (err error) {
stmt := common.TxStmt(txn, s.insertEventInTopologyStmt) stmt := common.TxStmt(txn, s.insertEventInTopologyStmt)
_, err = stmt.ExecContext( _, err = stmt.ExecContext(

View file

@ -40,7 +40,7 @@ import (
type stateDelta struct { type stateDelta struct {
roomID string roomID string
stateEvents []gomatrixserverlib.Event stateEvents []gomatrixserverlib.HeaderedEvent
membership string membership string
// The PDU stream position of the latest membership event for this user, if applicable. // The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta. // Can be 0 if there is no membership event in this delta.
@ -126,7 +126,7 @@ func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[s
// If an event is not found in the database then it will be omitted from the list. // If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database. // Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events. // Does not include any transaction IDs in the returned events.
func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) { func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) {
streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -137,7 +137,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
return d.StreamEventsToEvents(nil, streamEvents), nil return d.StreamEventsToEvents(nil, streamEvents), nil
} }
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.Event) error { func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
// If the event is already known as a backward extremity, don't consider // If the event is already known as a backward extremity, don't consider
// it as such anymore now that we have it. // it as such anymore now that we have it.
isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()) isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID())
@ -181,8 +181,8 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx
// Returns an error if there was a problem inserting this event. // Returns an error if there was a problem inserting this event.
func (d *SyncServerDatasource) WriteEvent( func (d *SyncServerDatasource) WriteEvent(
ctx context.Context, ctx context.Context,
ev *gomatrixserverlib.Event, ev *gomatrixserverlib.HeaderedEvent,
addStateEvents []gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.HeaderedEvent,
addStateEventIDs, removeStateEventIDs []string, addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID, excludeFromSync bool, transactionID *api.TransactionID, excludeFromSync bool,
) (pduPosition types.StreamPosition, returnErr error) { ) (pduPosition types.StreamPosition, returnErr error) {
@ -218,7 +218,7 @@ func (d *SyncServerDatasource) WriteEvent(
func (d *SyncServerDatasource) updateRoomState( func (d *SyncServerDatasource) updateRoomState(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
removedEventIDs []string, removedEventIDs []string,
addedEvents []gomatrixserverlib.Event, addedEvents []gomatrixserverlib.HeaderedEvent,
pduPosition types.StreamPosition, pduPosition types.StreamPosition,
) error { ) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
@ -254,7 +254,7 @@ func (d *SyncServerDatasource) updateRoomState(
// If there was an issue during the retrieval, returns an error // If there was an issue during the retrieval, returns an error
func (d *SyncServerDatasource) GetStateEvent( func (d *SyncServerDatasource) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string, ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.Event, error) { ) (*gomatrixserverlib.HeaderedEvent, error) {
return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey) return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
} }
@ -263,7 +263,7 @@ func (d *SyncServerDatasource) GetStateEvent(
// Returns an error if there was an issue with the retrieval. // Returns an error if there was an issue with the retrieval.
func (d *SyncServerDatasource) GetStateEventsForRoom( func (d *SyncServerDatasource) GetStateEventsForRoom(
ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter,
) (stateEvents []gomatrixserverlib.Event, err error) { ) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error { err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart) stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
return err return err
@ -633,8 +633,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// Build up a /sync response. Add joined rooms. // Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs { for _, roomID := range joinedRoomIDs {
var stateEvents []gomatrixserverlib.HeaderedEvent
var stateEvents []gomatrixserverlib.Event
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart) stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart)
if err != nil { if err != nil {
return return
@ -665,14 +664,13 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// We don't include a device here as we don't need to send down // We don't include a device here as we don't need to send down
// transaction IDs for complete syncs // transaction IDs for complete syncs
recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents) recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0, types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
).String() ).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr res.Rooms.Join[roomID] = *jr
} }
@ -748,7 +746,7 @@ func (d *SyncServerDatasource) UpsertAccountData(
// If the invite was successfully stored this returns the stream ID it was stored at. // If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database. // Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatasource) AddInviteEvent( func (d *SyncServerDatasource) AddInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event, ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
) (streamPos types.StreamPosition, err error) { ) (streamPos types.StreamPosition, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error { err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
streamPos, err = d.streamID.nextStreamID(ctx, txn) streamPos, err = d.streamID.nextStreamID(ctx, txn)
@ -806,7 +804,7 @@ func (d *SyncServerDatasource) addInvitesToResponse(
for roomID, inviteEvent := range invites { for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse() ir := types.NewInviteResponse()
ir.InviteState.Events = gomatrixserverlib.ToClientEvents( ir.InviteState.Events = gomatrixserverlib.ToClientEvents(
[]gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync, []gomatrixserverlib.Event{inviteEvent.Event}, gomatrixserverlib.FormatSync,
) )
// TODO: add the invite state from the invite event. // TODO: add the invite state from the invite event.
res.Rooms.Invite[roomID] = *ir res.Rooms.Invite[roomID] = *ir
@ -858,8 +856,12 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
if err != nil { if err != nil {
return err return err
} }
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) headeredRecentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back var recentEvents []gomatrixserverlib.Event
for _, event := range d.StreamEventsToEvents(nil, recentStreamEvents) {
recentEvents = append(recentEvents, event.Event)
}
delta.stateEvents = removeDuplicates(delta.stateEvents, headeredRecentEvents)
backwardTopologyPos := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents) backwardTopologyPos := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
switch delta.membership { switch delta.membership {
@ -871,7 +873,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
).String() ).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr res.Rooms.Join[delta.roomID] = *jr
case gomatrixserverlib.Leave: case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban fallthrough // transitions to leave are the same as ban
@ -884,7 +886,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
).String() ).String()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr res.Rooms.Leave[delta.roomID] = *lr
} }
@ -1013,7 +1015,7 @@ func (d *SyncServerDatasource) getStateDeltas(
// dupe join events will result in the entire room state coming down to the client again. This is added in // dupe join events will result in the entire room state coming down to the client again. This is added in
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
// the timeline. // the timeline.
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" {
if membership == gomatrixserverlib.Join { if membership == gomatrixserverlib.Join {
// send full room state down instead of a delta // send full room state down instead of a delta
var s []types.StreamEvent var s []types.StreamEvent
@ -1094,7 +1096,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
for roomID, stateStreamEvents := range state { for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents { for _, ev := range stateStreamEvents {
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" {
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above. if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
deltas = append(deltas, stateDelta{ deltas = append(deltas, stateDelta{
membership: membership, membership: membership,
@ -1122,7 +1124,7 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
} }
s := make([]types.StreamEvent, len(allState)) s := make([]types.StreamEvent, len(allState))
for i := 0; i < len(s); i++ { for i := 0; i < len(s); i++ {
s[i] = types.StreamEvent{Event: allState[i], StreamPosition: 0} s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0}
} }
return s, nil return s, nil
} }
@ -1130,10 +1132,10 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and // StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets // matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event. // added to the unsigned section of the output event.
func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event { func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent {
out := make([]gomatrixserverlib.Event, len(in)) out := make([]gomatrixserverlib.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ { for i := 0; i < len(in); i++ {
out[i] = in[i].Event out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil { if device != nil && in[i].TransactionID != nil {
if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID { if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID {
err := out[i].SetUnsignedField( err := out[i].SetUnsignedField(
@ -1153,7 +1155,7 @@ func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in
// There may be some overlap where events in stateEvents are already in recentEvents, so filter // There may be some overlap where events in stateEvents are already in recentEvents, so filter
// them out so we don't include them twice in the /sync response. They should be in recentEvents // them out so we don't include them twice in the /sync response. They should be in recentEvents
// only, so clients get to the correct state once they have rolled forward. // only, so clients get to the correct state once they have rolled forward.
func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event { func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
for _, recentEv := range recentEvents { for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil { if recentEv.StateKey() == nil {
continue // not a state event continue // not a state event
@ -1177,7 +1179,7 @@ func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gom
// getMembershipFromEvent returns the value of content.membership iff the event is a state event // getMembershipFromEvent returns the value of content.membership iff the event is a state event
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. // with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { func getMembershipFromEvent(ev *gomatrixserverlib.HeaderedEvent, userID string) string {
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
membership, err := ev.Membership() membership, err := ev.Membership()
if err != nil { if err != nil {

View file

@ -67,7 +67,7 @@ func NewNotifier(pos types.PaginationToken) *Notifier {
// Typically a consumer supplies a posUpdate with the latest sync position for the // Typically a consumer supplies a posUpdate with the latest sync position for the
// event type it handles, leaving other fields as 0. // event type it handles, leaving other fields as 0.
func (n *Notifier) OnNewEvent( func (n *Notifier) OnNewEvent(
ev *gomatrixserverlib.Event, roomID string, userIDs []string, ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string,
posUpdate types.PaginationToken, posUpdate types.PaginationToken,
) { ) {
// update the current position then notify relevant /sync streams. // update the current position then notify relevant /sync streams.

View file

@ -16,6 +16,7 @@ package sync
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"sync" "sync"
"testing" "testing"
@ -29,9 +30,9 @@ import (
) )
var ( var (
randomMessageEvent gomatrixserverlib.Event randomMessageEvent gomatrixserverlib.HeaderedEvent
aliceInviteBobEvent gomatrixserverlib.Event aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
bobLeaveEvent gomatrixserverlib.Event bobLeaveEvent gomatrixserverlib.HeaderedEvent
syncPositionVeryOld types.PaginationToken syncPositionVeryOld types.PaginationToken
syncPositionBefore types.PaginationToken syncPositionBefore types.PaginationToken
syncPositionAfter types.PaginationToken syncPositionAfter types.PaginationToken
@ -67,7 +68,8 @@ func init() {
syncPositionAfter2.PDUPosition = 13 syncPositionAfter2.PDUPosition = 13
var err error var err error
randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ err = json.Unmarshal([]byte(`{
"_room_version": "1",
"type": "m.room.message", "type": "m.room.message",
"content": { "content": {
"body": "Hello World", "body": "Hello World",
@ -77,11 +79,11 @@ func init() {
"room_id": "`+roomID+`", "room_id": "`+roomID+`",
"origin_server_ts": 12345, "origin_server_ts": 12345,
"event_id": "$randomMessageEvent:localhost" "event_id": "$randomMessageEvent:localhost"
}`), false) }`), &randomMessageEvent)
if err != nil { if err != nil {
panic(err) panic(err)
} }
aliceInviteBobEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ err = json.Unmarshal([]byte(`{
"type": "m.room.member", "type": "m.room.member",
"state_key": "`+bob+`", "state_key": "`+bob+`",
"content": { "content": {
@ -91,11 +93,11 @@ func init() {
"room_id": "`+roomID+`", "room_id": "`+roomID+`",
"origin_server_ts": 12345, "origin_server_ts": 12345,
"event_id": "$aliceInviteBobEvent:localhost" "event_id": "$aliceInviteBobEvent:localhost"
}`), false) }`), &aliceInviteBobEvent)
if err != nil { if err != nil {
panic(err) panic(err)
} }
bobLeaveEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{ err = json.Unmarshal([]byte(`{
"type": "m.room.member", "type": "m.room.member",
"state_key": "`+bob+`", "state_key": "`+bob+`",
"content": { "content": {
@ -105,7 +107,7 @@ func init() {
"room_id": "`+roomID+`", "room_id": "`+roomID+`",
"origin_server_ts": 12345, "origin_server_ts": 12345,
"event_id": "$bobLeaveEvent:localhost" "event_id": "$bobLeaveEvent:localhost"
}`), false) }`), &bobLeaveEvent)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View file

@ -40,7 +40,7 @@ type StreamPosition int64
// Same as gomatrixserverlib.Event but also has the PDU stream position for this event. // Same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type StreamEvent struct { type StreamEvent struct {
gomatrixserverlib.Event gomatrixserverlib.HeaderedEvent
StreamPosition StreamPosition StreamPosition StreamPosition
TransactionID *api.TransactionID TransactionID *api.TransactionID
ExcludeFromSync bool ExcludeFromSync bool