HeaderedEvents in sync API (#922)

* Use HeaderedEvent in syncapi

* Update notifier test

* Fix persisting headered event

* Clean up unused API function

* Fix overshadowed err from linter

* Write headered JSON to invites table too

* Rename event_json to headered_event_json in syncapi database schemae

* Fix invites_table queries

* Update QueryRoomVersionCapabilitiesResponse comment

* Fix syncapi SQLite
This commit is contained in:
Neil Alexander 2020-03-19 12:07:01 +00:00 committed by GitHub
parent bfbf96eec9
commit ad5849d222
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 260 additions and 189 deletions

2
go.mod
View file

@ -11,7 +11,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200317140257-ddc7feaaf2fd
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318180716-bc4ff56961e2
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible

6
go.sum
View file

@ -270,6 +270,12 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200317114945-9a368ea4620d h1:0G
github.com/matrix-org/gomatrixserverlib v0.0.0-20200317114945-9a368ea4620d/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200317140257-ddc7feaaf2fd h1:n95A8YyiCZ8Nu2beqw4akCaPIRrZr/nesHYDZV8WkXI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200317140257-ddc7feaaf2fd/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318141718-859b9f256ffd h1:381JBgdNYOQwWelV/LOPqDoUkdSBUCkVZY8Of+n5jAM=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318141718-859b9f256ffd/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a h1:7+e7ManmyiGNqo06sQIRIoUtoV92XWzSbc0o3e4aTGY=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203 h1:7HkL6bF7/M2cYteNFVtvGW5qjD4wHIiR0HsdCm2Rqao=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8=

View file

@ -43,6 +43,8 @@ type QueryLatestEventsAndStateResponse struct {
// Does the room exist?
// If the room doesn't exist this will be false and LatestEvents will be empty.
RoomExists bool `json:"room_exists"`
// The room version of the room.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// The latest events in the room.
// These are used to set the prev_events when sending an event.
LatestEvents []gomatrixserverlib.EventReference `json:"latest_events"`
@ -74,6 +76,8 @@ type QueryStateAfterEventsResponse struct {
// Does the room exist on this roomserver?
// If the room doesn't exist this will be false and StateEvents will be empty.
RoomExists bool `json:"room_exists"`
// The room version of the room.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// Do all the previous events exist on this roomserver?
// If some of previous events do not exist this will be false and StateEvents will be empty.
PrevEventsExist bool `json:"prev_events_exist"`
@ -207,6 +211,8 @@ type QueryStateAndAuthChainResponse struct {
// Does the room exist on this roomserver?
// If the room doesn't exist this will be false and StateEvents will be empty.
RoomExists bool `json:"room_exists"`
// The room version of the room.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// Do all the previous events exist on this roomserver?
// If some of previous events do not exist this will be false and StateEvents will be empty.
PrevEventsExist bool `json:"prev_events_exist"`
@ -249,10 +255,10 @@ type QueryServersInRoomAtEventResponse struct {
// QueryRoomVersionCapabilities asks for the default room version
type QueryRoomVersionCapabilitiesRequest struct{}
// QueryRoomVersionCapabilitiesResponse is a response to QueryServersInRoomAtEventResponse
// QueryRoomVersionCapabilitiesResponse is a response to QueryRoomVersionCapabilitiesRequest
type QueryRoomVersionCapabilitiesResponse struct {
DefaultRoomVersion string `json:"default"`
AvailableRoomVersions map[string]string `json:"available"`
DefaultRoomVersion gomatrixserverlib.RoomVersion `json:"default"`
AvailableRoomVersions map[gomatrixserverlib.RoomVersion]string `json:"available"`
}
// RoomserverQueryAPI is used to query information from the room server.
@ -536,7 +542,7 @@ func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent(
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryServersInRoomAtEvent implements RoomServerQueryAPI
// QueryRoomVersionCapabilities implements RoomServerQueryAPI
func (h *httpRoomserverQueryAPI) QueryRoomVersionCapabilities(
ctx context.Context,
request *QueryRoomVersionCapabilitiesRequest,

View file

@ -808,14 +808,13 @@ func (r *RoomserverQueryAPI) QueryRoomVersionCapabilities(
request *api.QueryRoomVersionCapabilitiesRequest,
response *api.QueryRoomVersionCapabilitiesResponse,
) error {
response.DefaultRoomVersion = string(version.DefaultRoomVersion())
response.AvailableRoomVersions = make(map[string]string)
response.DefaultRoomVersion = version.DefaultRoomVersion()
response.AvailableRoomVersions = make(map[gomatrixserverlib.RoomVersion]string)
for v, desc := range version.SupportedRoomVersions() {
sv := string(v)
if desc.Stable {
response.AvailableRoomVersions[sv] = "stable"
response.AvailableRoomVersions[v] = "stable"
} else {
response.AvailableRoomVersions[sv] = "unstable"
response.AvailableRoomVersions[v] = "unstable"
}
}
return nil

View file

@ -155,7 +155,7 @@ func extractRoomVersionFromCreateEvent(event gomatrixserverlib.Event) (
}
// A room version was specified in the event content?
if createContent.RoomVersion != nil {
roomVersion = *createContent.RoomVersion
roomVersion = gomatrixserverlib.RoomVersion(*createContent.RoomVersion)
}
return roomVersion, err
}

View file

@ -184,7 +184,7 @@ func extractRoomVersionFromCreateEvent(event gomatrixserverlib.Event) (
}
// A room version was specified in the event content?
if createContent.RoomVersion != nil {
roomVersion = *createContent.RoomVersion
roomVersion = gomatrixserverlib.RoomVersion(*createContent.RoomVersion)
}
return roomVersion, err
}

View file

@ -98,10 +98,11 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent,
) error {
ev := msg.Event.Event
ev := msg.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"room_version": ev.RoomVersion,
}).Info("received event from roomserver")
addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev)
@ -153,7 +154,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) error {
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event.Event)
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
@ -163,7 +164,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
s.notifier.OnNewEvent(&msg.Event.Event, "", nil, types.PaginationToken{PDUPosition: pduPos})
s.notifier.OnNewEvent(&msg.Event, "", nil, types.PaginationToken{PDUPosition: pduPos})
return nil
}
@ -186,8 +187,8 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
// lookupStateEvents looks up the state events that are added by a new event.
func (s *OutputRoomEventConsumer) lookupStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.Event,
) ([]gomatrixserverlib.Event, error) {
addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
) ([]gomatrixserverlib.HeaderedEvent, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
return nil, nil
@ -195,7 +196,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
return []gomatrixserverlib.Event{event}, nil
return []gomatrixserverlib.HeaderedEvent{event}, nil
}
// Check if this is re-adding a state events that we previously processed
@ -229,10 +230,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
return nil, err
}
for _, headeredEvent := range eventResp.Events {
result = append(result, headeredEvent.Event)
}
result = append(result, eventResp.Events...)
missing = missingEventsFrom(result, addsStateEventIDs)
if len(missing) != 0 {
@ -244,7 +242,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
return result, nil
}
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) {
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
var stateKey string
if event.StateKey() == nil {
stateKey = ""
@ -269,10 +267,11 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Event
PrevSender: prevEvent.Sender(),
}
return event.SetUnsigned(prev)
event.Event, err = event.SetUnsigned(prev)
return event, err
}
func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string {
func missingEventsFrom(events []gomatrixserverlib.HeaderedEvent, required []string) []string {
have := map[string]bool{}
for _, event := range events {
have[event.EventID()] = true

View file

@ -181,7 +181,7 @@ func (r *messagesReq) retrieveEvents() (
return
}
var events []gomatrixserverlib.Event
var events []gomatrixserverlib.HeaderedEvent
// There can be two reasons for streamEvents to be empty: either we've
// reached the oldest event in the room (or the most recent one, depending
@ -217,7 +217,7 @@ func (r *messagesReq) retrieveEvents() (
}
// Convert all of the events into client events.
clientEvents = gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll)
clientEvents = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll)
// Get the position of the first and the last event in the room's topology.
// This position is currently determined by the event's depth, so we could
// also use it instead of retrieving from the database. However, if we ever
@ -273,7 +273,7 @@ func (r *messagesReq) retrieveEvents() (
// Returns an error if there was an issue talking with the database or
// backfilling.
func (r *messagesReq) handleEmptyEventsSlice() (
events []gomatrixserverlib.Event, err error,
events []gomatrixserverlib.HeaderedEvent, err error,
) {
backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID)
@ -287,7 +287,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
} else {
// If not, it means the slice was empty because we reached the room's
// creation, so return an empty slice.
events = []gomatrixserverlib.Event{}
events = []gomatrixserverlib.HeaderedEvent{}
}
return
@ -299,7 +299,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
// through backfilling if needed.
// Returns an error if there was an issue while backfilling.
func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent) (
events []gomatrixserverlib.Event, err error,
events []gomatrixserverlib.HeaderedEvent, err error,
) {
// Check if we have enough events.
isSetLargeEnough := true
@ -326,7 +326,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward.
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
var pdus []gomatrixserverlib.Event
var pdus []gomatrixserverlib.HeaderedEvent
// Only ask the remote server for enough events to reach the limit.
pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents))
if err != nil {
@ -400,7 +400,7 @@ func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (boo
// event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.Event, error) {
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
// Query the list of servers in the room when one of the backward extremities
// was sent.
var serversResponse api.QueryServersInRoomAtEventResponse
@ -428,7 +428,7 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
}
}
pdus := make([]gomatrixserverlib.Event, 0)
pdus := make([]gomatrixserverlib.HeaderedEvent, 0)
// If the roomserver responded with at least one server that isn't us,
// send it a request for backfill.
@ -440,13 +440,20 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
return nil, err
}
pdus = txn.PDUs
for _, p := range txn.PDUs {
pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
}
// Store the events in the database, while marking them as unfit to show
// up in responses to sync requests.
for _, pdu := range pdus {
headered := pdu.Headered(gomatrixserverlib.RoomVersionV1)
if _, err = r.db.WriteEvent(
r.ctx, &pdu, []gomatrixserverlib.Event{}, []string{}, []string{},
r.ctx,
&headered,
[]gomatrixserverlib.HeaderedEvent{},
[]string{},
[]string{},
nil, true,
); err != nil {
return nil, err
@ -486,7 +493,7 @@ func setToDefault(
// timestamp of two Matrix events.
// Returns true if the first event happened before the second one, false
// otherwise.
func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool {
func sortEvents(e1 *gomatrixserverlib.HeaderedEvent, e2 *gomatrixserverlib.HeaderedEvent) bool {
t := e1.OriginServerTS().Time()
return e2.OriginServerTS().Time().After(t)
}

View file

@ -56,7 +56,9 @@ func OnIncomingStateRequest(req *http.Request, db storage.Database, roomID strin
// Fill the prev_content and replaces_state keys if necessary
for _, event := range stateEvents {
stateEvent := stateEventInStateResp{
ClientEvent: gomatrixserverlib.ToClientEvent(event, gomatrixserverlib.FormatAll),
ClientEvent: gomatrixserverlib.HeaderedToClientEvents(
[]gomatrixserverlib.HeaderedEvent{event}, gomatrixserverlib.FormatAll,
)[0],
}
var prevEventRef types.PrevEventRef
if len(event.Unsigned()) > 0 {
@ -113,7 +115,7 @@ func OnIncomingStateTypeRequest(req *http.Request, db storage.Database, roomID s
}
stateEvent := stateEventInStateResp{
ClientEvent: gomatrixserverlib.ToClientEvent(*event, gomatrixserverlib.FormatAll),
ClientEvent: gomatrixserverlib.HeaderedToClientEvent(*event, gomatrixserverlib.FormatAll),
}
return util.JSONResponse{

View file

@ -29,16 +29,16 @@ import (
type Database interface {
common.PartitionStorer
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error)
WriteEvent(context.Context, *gomatrixserverlib.Event, []gomatrixserverlib.Event, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error)
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error)
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.Event, err error)
Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error)
WriteEvent(context.Context, *gomatrixserverlib.HeaderedEvent, []gomatrixserverlib.HeaderedEvent, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error)
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.HeaderedEvent, err error)
SyncPosition(ctx context.Context) (types.PaginationToken, error)
IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error)
CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error)
GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error)
UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error)
AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (types.StreamPosition, error)
AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error)
RetireInviteEvent(ctx context.Context, inviteEventID string) error
SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn)
AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition
@ -48,6 +48,6 @@ type Database interface {
EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error)
BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error)
MaxTopologicalPosition(ctx context.Context, roomID string) (types.StreamPosition, error)
StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event
StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent
SyncStreamPosition(ctx context.Context) (types.StreamPosition, error)
}

View file

@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
-- The state_key value for this state event e.g ''
state_key TEXT NOT NULL,
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
event_json TEXT NOT NULL,
headered_event_json TEXT NOT NULL,
-- The 'content.membership' value if this event is an m.room.member event. For other
-- events, this will be NULL.
membership TEXT,
@ -59,10 +59,10 @@ CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(
`
const upsertRoomStateSQL = "" +
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at)" +
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" +
" ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9"
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9"
const deleteRoomStateByEventIDSQL = "" +
"DELETE FROM syncapi_current_room_state WHERE event_id = $1"
@ -71,7 +71,7 @@ const selectRoomIDsWithMembershipSQL = "" +
"SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
const selectCurrentStateSQL = "" +
"SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" +
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" +
" AND ( $2::text[] IS NULL OR sender = ANY($2) )" +
" AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" +
" AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
@ -83,14 +83,14 @@ const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
const selectStateEventSQL = "" +
"SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
const selectEventsWithEventIDsSQL = "" +
// TODO: The session_id and transaction_id blanks are here because otherwise
// the rowsToStreamEvents expects there to be exactly five columns. We need to
// figure out if these really need to be in the DB, and if so, we need a
// better permanent fix for this. - neilalexander, 2 Jan 2020
"SELECT added_at, event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
"SELECT added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
" FROM syncapi_current_room_state WHERE event_id = ANY($1)"
type currentRoomStateStatements struct {
@ -185,7 +185,7 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
func (s *currentRoomStateStatements) selectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]gomatrixserverlib.Event, error) {
) ([]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
rows, err := stmt.QueryContext(ctx, roomID,
pq.StringArray(stateFilter.Senders),
@ -213,7 +213,7 @@ func (s *currentRoomStateStatements) deleteRoomStateByEventID(
func (s *currentRoomStateStatements) upsertRoomState(
ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.Event, membership *string, addedAt types.StreamPosition,
event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
// Parse content as JSON and search for an "url" key
containsURL := false
@ -223,9 +223,14 @@ func (s *currentRoomStateStatements) upsertRoomState(
_, containsURL = content["url"]
}
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
}
// upsert state event
stmt := common.TxStmt(txn, s.upsertRoomStateStmt)
_, err := stmt.ExecContext(
_, err = stmt.ExecContext(
ctx,
event.RoomID(),
event.EventID(),
@ -233,7 +238,7 @@ func (s *currentRoomStateStatements) upsertRoomState(
event.Sender(),
containsURL,
*event.StateKey(),
event.JSON(),
headeredJSON,
membership,
addedAt,
)
@ -252,16 +257,16 @@ func (s *currentRoomStateStatements) selectEventsWithEventIDs(
return rowsToStreamEvents(rows)
}
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
result := []gomatrixserverlib.Event{}
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) {
result := []gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var eventBytes []byte
if err := rows.Scan(&eventBytes); err != nil {
return nil, err
}
// TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
if err != nil {
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, ev)
@ -271,7 +276,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
func (s *currentRoomStateStatements) selectStateEvent(
ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.Event, error) {
) (*gomatrixserverlib.HeaderedEvent, error) {
stmt := s.selectStateEventStmt
var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
@ -281,6 +286,9 @@ func (s *currentRoomStateStatements) selectStateEvent(
if err != nil {
return nil, err
}
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false)
var ev gomatrixserverlib.HeaderedEvent
if err = json.Unmarshal(res, &ev); err != nil {
return nil, err
}
return &ev, err
}

View file

@ -18,6 +18,7 @@ package postgres
import (
"context"
"database/sql"
"encoding/json"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types"
@ -30,7 +31,7 @@ CREATE TABLE IF NOT EXISTS syncapi_invite_events (
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
target_user_id TEXT NOT NULL,
event_json TEXT NOT NULL
headered_event_json TEXT NOT NULL
);
-- For looking up the invites for a given user.
@ -44,14 +45,14 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx
const insertInviteEventSQL = "" +
"INSERT INTO syncapi_invite_events (" +
" room_id, event_id, target_user_id, event_json" +
" room_id, event_id, target_user_id, headered_event_json" +
") VALUES ($1, $2, $3, $4) RETURNING id"
const deleteInviteEventSQL = "" +
"DELETE FROM syncapi_invite_events WHERE event_id = $1"
const selectInviteEventsInRangeSQL = "" +
"SELECT room_id, event_json FROM syncapi_invite_events" +
"SELECT room_id, headered_event_json FROM syncapi_invite_events" +
" WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC"
@ -86,14 +87,20 @@ func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) {
}
func (s *inviteEventsStatements) insertInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event,
ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
var headeredJSON []byte
headeredJSON, err = json.Marshal(inviteEvent)
if err != nil {
return
}
err = s.insertInviteEventStmt.QueryRowContext(
ctx,
inviteEvent.RoomID(),
inviteEvent.EventID(),
*inviteEvent.StateKey(),
inviteEvent.JSON(),
headeredJSON,
).Scan(&streamPos)
return
}
@ -109,14 +116,14 @@ func (s *inviteEventsStatements) deleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) selectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition,
) (map[string]gomatrixserverlib.Event, error) {
) (map[string]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos)
if err != nil {
return nil, err
}
defer common.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
result := map[string]gomatrixserverlib.Event{}
result := map[string]gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var (
roomID string
@ -126,8 +133,8 @@ func (s *inviteEventsStatements) selectInviteEventsInRange(
return nil, err
}
event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false)
if err != nil {
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, err
}

View file

@ -44,8 +44,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
event_id TEXT NOT NULL CONSTRAINT syncapi_event_id_idx UNIQUE,
-- The 'room_id' key for the event.
room_id TEXT NOT NULL,
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
event_json TEXT NOT NULL,
-- The headered JSON for the event, containing potentially additional metadata such as
-- the room version. Stored as TEXT because this should be valid UTF-8.
headered_event_json TEXT NOT NULL,
-- The event type e.g 'm.room.member'.
type TEXT NOT NULL,
-- The 'sender' property of the event.
@ -70,26 +71,26 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
"room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " +
"ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = $11 " +
"RETURNING id"
const selectEventsSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
"SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
const selectRecentEventsSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
"SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC LIMIT $4"
const selectRecentEventsForSyncSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
"SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
" ORDER BY id DESC LIMIT $4"
const selectEarlyEventsSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
"SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC LIMIT $4"
@ -98,7 +99,7 @@ const selectMaxEventIDSQL = "" +
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
"SELECT id, event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" AND ( $3::text[] IS NULL OR sender = ANY($3) )" +
@ -203,8 +204,8 @@ func (s *outputRoomEventsStatements) selectStateInRange(
}
// TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
if err != nil {
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, nil, err
}
needSet := stateNeeded[ev.RoomID()]
@ -220,7 +221,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{
Event: ev,
HeaderedEvent: ev,
StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync,
}
@ -248,7 +249,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID(
// of the inserted event.
func (s *outputRoomEventsStatements) insertEvent(
ctx context.Context, txn *sql.Tx,
event *gomatrixserverlib.Event, addState, removeState []string,
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool,
) (streamPos types.StreamPosition, err error) {
var txnID *string
@ -266,12 +267,18 @@ func (s *outputRoomEventsStatements) insertEvent(
_, containsURL = content["url"]
}
var headeredJSON []byte
headeredJSON, err = json.Marshal(event)
if err != nil {
return
}
stmt := common.TxStmt(txn, s.insertEventStmt)
err = stmt.QueryRowContext(
ctx,
event.RoomID(),
event.EventID(),
event.JSON(),
headeredJSON,
event.Type(),
event.Sender(),
containsURL,
@ -373,8 +380,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return nil, err
}
// TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
if err != nil {
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@ -386,7 +393,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
result = append(result, types.StreamEvent{
Event: ev,
HeaderedEvent: ev,
StreamPosition: streamPos,
TransactionID: transactionID,
ExcludeFromSync: excludeFromSync,

View file

@ -104,7 +104,7 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
// insertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
ctx context.Context, event *gomatrixserverlib.Event,
ctx context.Context, event *gomatrixserverlib.HeaderedEvent,
) (err error) {
_, err = s.insertEventInTopologyStmt.ExecContext(
ctx, event.EventID(), event.Depth(), event.RoomID(),

View file

@ -37,7 +37,7 @@ import (
type stateDelta struct {
roomID string
stateEvents []gomatrixserverlib.Event
stateEvents []gomatrixserverlib.HeaderedEvent
membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.
@ -100,7 +100,7 @@ func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[s
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) {
func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) {
streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs)
if err != nil {
return nil, err
@ -111,7 +111,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
return d.StreamEventsToEvents(nil, streamEvents), nil
}
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.Event) error {
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error {
// If the event is already known as a backward extremity, don't consider
// it as such anymore now that we have it.
isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID())
@ -155,8 +155,8 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev
// Returns an error if there was a problem inserting this event.
func (d *SyncServerDatasource) WriteEvent(
ctx context.Context,
ev *gomatrixserverlib.Event,
addStateEvents []gomatrixserverlib.Event,
ev *gomatrixserverlib.HeaderedEvent,
addStateEvents []gomatrixserverlib.HeaderedEvent,
addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID, excludeFromSync bool,
) (pduPosition types.StreamPosition, returnErr error) {
@ -192,7 +192,7 @@ func (d *SyncServerDatasource) WriteEvent(
func (d *SyncServerDatasource) updateRoomState(
ctx context.Context, txn *sql.Tx,
removedEventIDs []string,
addedEvents []gomatrixserverlib.Event,
addedEvents []gomatrixserverlib.HeaderedEvent,
pduPosition types.StreamPosition,
) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
@ -228,7 +228,7 @@ func (d *SyncServerDatasource) updateRoomState(
// If there was an issue during the retrieval, returns an error
func (d *SyncServerDatasource) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.Event, error) {
) (*gomatrixserverlib.HeaderedEvent, error) {
return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
}
@ -237,7 +237,7 @@ func (d *SyncServerDatasource) GetStateEvent(
// Returns an error if there was an issue with the retrieval.
func (d *SyncServerDatasource) GetStateEventsForRoom(
ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter,
) (stateEvents []gomatrixserverlib.Event, err error) {
) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter)
return err
@ -599,7 +599,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
var stateEvents []gomatrixserverlib.Event
var stateEvents []gomatrixserverlib.HeaderedEvent
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilter)
if err != nil {
return
@ -633,9 +633,9 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
@ -707,7 +707,7 @@ func (d *SyncServerDatasource) UpsertAccountData(
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatasource) AddInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event,
ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
) (types.StreamPosition, error) {
return d.invites.insertInviteEvent(ctx, inviteEvent)
}
@ -758,7 +758,7 @@ func (d *SyncServerDatasource) addInvitesToResponse(
for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse()
ir.InviteState.Events = gomatrixserverlib.ToClientEvents(
[]gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync,
[]gomatrixserverlib.Event{inviteEvent.Event}, gomatrixserverlib.FormatSync,
)
// TODO: add the invite state from the invite event.
res.Rooms.Invite[roomID] = *ir
@ -821,9 +821,9 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
@ -834,9 +834,9 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
).String()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
}
@ -1074,7 +1074,7 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
}
s := make([]types.StreamEvent, len(allState))
for i := 0; i < len(s); i++ {
s[i] = types.StreamEvent{Event: allState[i], StreamPosition: 0}
s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0}
}
return s, nil
}
@ -1082,10 +1082,10 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event {
out := make([]gomatrixserverlib.Event, len(in))
func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent {
out := make([]gomatrixserverlib.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[i].Event
out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil {
if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID {
err := out[i].SetUnsignedField(
@ -1105,7 +1105,7 @@ func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
// them out so we don't include them twice in the /sync response. They should be in recentEvents
// only, so clients get to the correct state once they have rolled forward.
func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event {
func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event

View file

@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
sender TEXT NOT NULL,
contains_url BOOL NOT NULL DEFAULT false,
state_key TEXT NOT NULL,
event_json TEXT NOT NULL,
headered_event_json TEXT NOT NULL,
membership TEXT,
added_at BIGINT,
UNIQUE (room_id, type, state_key)
@ -47,10 +47,10 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_s
`
const upsertRoomStateSQL = "" +
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at)" +
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" +
" ON CONFLICT (event_id, room_id, type, sender, contains_url)" +
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9"
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9"
const deleteRoomStateByEventIDSQL = "" +
"DELETE FROM syncapi_current_room_state WHERE event_id = $1"
@ -59,7 +59,7 @@ const selectRoomIDsWithMembershipSQL = "" +
"SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
const selectCurrentStateSQL = "" +
"SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" +
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" +
" AND ( $2 IS NULL OR sender IN ($2) )" +
" AND ( $3 IS NULL OR NOT(sender IN ($3)) )" +
" AND ( $4 IS NULL OR type IN ($4) )" +
@ -71,14 +71,14 @@ const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
const selectStateEventSQL = "" +
"SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
const selectEventsWithEventIDsSQL = "" +
// TODO: The session_id and transaction_id blanks are here because otherwise
// the rowsToStreamEvents expects there to be exactly five columns. We need to
// figure out if these really need to be in the DB, and if so, we need a
// better permanent fix for this. - neilalexander, 2 Jan 2020
"SELECT added_at, event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
"SELECT added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
" FROM syncapi_current_room_state WHERE event_id IN ($1)"
type currentRoomStateStatements struct {
@ -171,7 +171,7 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
func (s *currentRoomStateStatements) selectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilterPart *gomatrixserverlib.StateFilter,
) ([]gomatrixserverlib.Event, error) {
) ([]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
rows, err := stmt.QueryContext(ctx, roomID,
nil, // FIXME: pq.StringArray(stateFilterPart.Senders),
@ -199,7 +199,7 @@ func (s *currentRoomStateStatements) deleteRoomStateByEventID(
func (s *currentRoomStateStatements) upsertRoomState(
ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.Event, membership *string, addedAt types.StreamPosition,
event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
// Parse content as JSON and search for an "url" key
containsURL := false
@ -209,9 +209,14 @@ func (s *currentRoomStateStatements) upsertRoomState(
_, containsURL = content["url"]
}
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
}
// upsert state event
stmt := common.TxStmt(txn, s.upsertRoomStateStmt)
_, err := stmt.ExecContext(
_, err = stmt.ExecContext(
ctx,
event.RoomID(),
event.EventID(),
@ -219,7 +224,7 @@ func (s *currentRoomStateStatements) upsertRoomState(
event.Sender(),
containsURL,
*event.StateKey(),
event.JSON(),
headeredJSON,
membership,
addedAt,
)
@ -242,16 +247,16 @@ func (s *currentRoomStateStatements) selectEventsWithEventIDs(
return rowsToStreamEvents(rows)
}
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
result := []gomatrixserverlib.Event{}
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) {
result := []gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var eventBytes []byte
if err := rows.Scan(&eventBytes); err != nil {
return nil, err
}
// TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
if err != nil {
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, ev)
@ -261,7 +266,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
func (s *currentRoomStateStatements) selectStateEvent(
ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.Event, error) {
) (*gomatrixserverlib.HeaderedEvent, error) {
stmt := s.selectStateEventStmt
var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
@ -271,6 +276,9 @@ func (s *currentRoomStateStatements) selectStateEvent(
if err != nil {
return nil, err
}
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false)
var ev gomatrixserverlib.HeaderedEvent
if err = json.Unmarshal(res, &ev); err != nil {
return nil, err
}
return &ev, err
}

View file

@ -18,6 +18,7 @@ package sqlite3
import (
"context"
"database/sql"
"encoding/json"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types"
@ -30,7 +31,7 @@ CREATE TABLE IF NOT EXISTS syncapi_invite_events (
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
target_user_id TEXT NOT NULL,
event_json TEXT NOT NULL
headered_event_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx ON syncapi_invite_events (target_user_id, id);
@ -39,14 +40,14 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx ON syncapi_invite_events
const insertInviteEventSQL = "" +
"INSERT INTO syncapi_invite_events" +
" (id, room_id, event_id, target_user_id, event_json)" +
" (id, room_id, event_id, target_user_id, headered_event_json)" +
" VALUES ($1, $2, $3, $4, $5)"
const deleteInviteEventSQL = "" +
"DELETE FROM syncapi_invite_events WHERE event_id = $1"
const selectInviteEventsInRangeSQL = "" +
"SELECT room_id, event_json FROM syncapi_invite_events" +
"SELECT room_id, headered_event_json FROM syncapi_invite_events" +
" WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC"
@ -83,15 +84,21 @@ func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatement
}
func (s *inviteEventsStatements) insertInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.Event, streamPos types.StreamPosition,
ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent, streamPos types.StreamPosition,
) (err error) {
var headeredJSON []byte
headeredJSON, err = json.Marshal(inviteEvent)
if err != nil {
return
}
_, err = txn.Stmt(s.insertInviteEventStmt).ExecContext(
ctx,
streamPos,
inviteEvent.RoomID(),
inviteEvent.EventID(),
*inviteEvent.StateKey(),
inviteEvent.JSON(),
headeredJSON,
)
return
}
@ -107,14 +114,14 @@ func (s *inviteEventsStatements) deleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) selectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition,
) (map[string]gomatrixserverlib.Event, error) {
) (map[string]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos)
if err != nil {
return nil, err
}
defer common.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
result := map[string]gomatrixserverlib.Event{}
result := map[string]gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var (
roomID string
@ -124,8 +131,8 @@ func (s *inviteEventsStatements) selectInviteEventsInRange(
return nil, err
}
event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false)
if err != nil {
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, err
}

View file

@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_id TEXT NOT NULL UNIQUE,
room_id TEXT NOT NULL,
event_json TEXT NOT NULL,
headered_event_json TEXT NOT NULL,
type TEXT NOT NULL,
sender TEXT NOT NULL,
contains_url BOOL NOT NULL,
@ -49,25 +49,25 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
"id, room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
"id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
"ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = $13"
const selectEventsSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1"
"SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1"
const selectRecentEventsSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
"SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC LIMIT $4"
const selectRecentEventsForSyncSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
"SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
" ORDER BY id DESC LIMIT $4"
const selectEarlyEventsSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
"SELECT id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC LIMIT $4"
@ -86,7 +86,7 @@ const selectMaxEventIDSQL = "" +
$8 = stateFilterPart.Limit,
*/
const selectStateInRangeSQL = "" +
"SELECT id, event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2)" + // old/new pos
" AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
@ -200,8 +200,8 @@ func (s *outputRoomEventsStatements) selectStateInRange(
}
// TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
if err != nil {
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, nil, err
}
needSet := stateNeeded[ev.RoomID()]
@ -217,7 +217,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{
Event: ev,
HeaderedEvent: ev,
StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync,
}
@ -245,7 +245,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID(
// of the inserted event.
func (s *outputRoomEventsStatements) insertEvent(
ctx context.Context, txn *sql.Tx,
event *gomatrixserverlib.Event, addState, removeState []string,
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool,
) (streamPos types.StreamPosition, err error) {
var txnID *string
@ -263,6 +263,12 @@ func (s *outputRoomEventsStatements) insertEvent(
_, containsURL = content["url"]
}
var headeredJSON []byte
headeredJSON, err = json.Marshal(event)
if err != nil {
return
}
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
if err != nil {
return
@ -283,7 +289,7 @@ func (s *outputRoomEventsStatements) insertEvent(
streamPos,
event.RoomID(),
event.EventID(),
event.JSON(),
headeredJSON,
event.Type(),
event.Sender(),
containsURL,
@ -392,8 +398,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
return nil, err
}
// TODO: Handle redacted events
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
if err != nil {
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
@ -405,7 +411,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
result = append(result, types.StreamEvent{
Event: ev,
HeaderedEvent: ev,
StreamPosition: streamPos,
TransactionID: transactionID,
ExcludeFromSync: excludeFromSync,

View file

@ -101,7 +101,7 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
// insertEventInTopology inserts the given event in the room's topology, based
// on the event's depth.
func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event,
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
) (err error) {
stmt := common.TxStmt(txn, s.insertEventInTopologyStmt)
_, err = stmt.ExecContext(

View file

@ -40,7 +40,7 @@ import (
type stateDelta struct {
roomID string
stateEvents []gomatrixserverlib.Event
stateEvents []gomatrixserverlib.HeaderedEvent
membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.
@ -126,7 +126,7 @@ func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[s
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) {
func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) {
streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs)
if err != nil {
return nil, err
@ -137,7 +137,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
return d.StreamEventsToEvents(nil, streamEvents), nil
}
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.Event) error {
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
// If the event is already known as a backward extremity, don't consider
// it as such anymore now that we have it.
isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID())
@ -181,8 +181,8 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx
// Returns an error if there was a problem inserting this event.
func (d *SyncServerDatasource) WriteEvent(
ctx context.Context,
ev *gomatrixserverlib.Event,
addStateEvents []gomatrixserverlib.Event,
ev *gomatrixserverlib.HeaderedEvent,
addStateEvents []gomatrixserverlib.HeaderedEvent,
addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID, excludeFromSync bool,
) (pduPosition types.StreamPosition, returnErr error) {
@ -218,7 +218,7 @@ func (d *SyncServerDatasource) WriteEvent(
func (d *SyncServerDatasource) updateRoomState(
ctx context.Context, txn *sql.Tx,
removedEventIDs []string,
addedEvents []gomatrixserverlib.Event,
addedEvents []gomatrixserverlib.HeaderedEvent,
pduPosition types.StreamPosition,
) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
@ -254,7 +254,7 @@ func (d *SyncServerDatasource) updateRoomState(
// If there was an issue during the retrieval, returns an error
func (d *SyncServerDatasource) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.Event, error) {
) (*gomatrixserverlib.HeaderedEvent, error) {
return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
}
@ -263,7 +263,7 @@ func (d *SyncServerDatasource) GetStateEvent(
// Returns an error if there was an issue with the retrieval.
func (d *SyncServerDatasource) GetStateEventsForRoom(
ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter,
) (stateEvents []gomatrixserverlib.Event, err error) {
) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
return err
@ -633,8 +633,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
var stateEvents []gomatrixserverlib.Event
var stateEvents []gomatrixserverlib.HeaderedEvent
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart)
if err != nil {
return
@ -665,14 +664,13 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs
recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
@ -748,7 +746,7 @@ func (d *SyncServerDatasource) UpsertAccountData(
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatasource) AddInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event,
ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
streamPos, err = d.streamID.nextStreamID(ctx, txn)
@ -806,7 +804,7 @@ func (d *SyncServerDatasource) addInvitesToResponse(
for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse()
ir.InviteState.Events = gomatrixserverlib.ToClientEvents(
[]gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync,
[]gomatrixserverlib.Event{inviteEvent.Event}, gomatrixserverlib.FormatSync,
)
// TODO: add the invite state from the invite event.
res.Rooms.Invite[roomID] = *ir
@ -858,8 +856,12 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
if err != nil {
return err
}
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
headeredRecentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
var recentEvents []gomatrixserverlib.Event
for _, event := range d.StreamEventsToEvents(nil, recentStreamEvents) {
recentEvents = append(recentEvents, event.Event)
}
delta.stateEvents = removeDuplicates(delta.stateEvents, headeredRecentEvents)
backwardTopologyPos := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
switch delta.membership {
@ -871,7 +873,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
@ -884,7 +886,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
).String()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
}
@ -1013,7 +1015,7 @@ func (d *SyncServerDatasource) getStateDeltas(
// dupe join events will result in the entire room state coming down to the client again. This is added in
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
// the timeline.
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" {
if membership == gomatrixserverlib.Join {
// send full room state down instead of a delta
var s []types.StreamEvent
@ -1094,7 +1096,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" {
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
deltas = append(deltas, stateDelta{
membership: membership,
@ -1122,7 +1124,7 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
}
s := make([]types.StreamEvent, len(allState))
for i := 0; i < len(s); i++ {
s[i] = types.StreamEvent{Event: allState[i], StreamPosition: 0}
s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0}
}
return s, nil
}
@ -1130,10 +1132,10 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event {
out := make([]gomatrixserverlib.Event, len(in))
func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent {
out := make([]gomatrixserverlib.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[i].Event
out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil {
if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID {
err := out[i].SetUnsignedField(
@ -1153,7 +1155,7 @@ func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
// them out so we don't include them twice in the /sync response. They should be in recentEvents
// only, so clients get to the correct state once they have rolled forward.
func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event {
func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event
@ -1177,7 +1179,7 @@ func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gom
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
func getMembershipFromEvent(ev *gomatrixserverlib.HeaderedEvent, userID string) string {
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
membership, err := ev.Membership()
if err != nil {

View file

@ -67,7 +67,7 @@ func NewNotifier(pos types.PaginationToken) *Notifier {
// Typically a consumer supplies a posUpdate with the latest sync position for the
// event type it handles, leaving other fields as 0.
func (n *Notifier) OnNewEvent(
ev *gomatrixserverlib.Event, roomID string, userIDs []string,
ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string,
posUpdate types.PaginationToken,
) {
// update the current position then notify relevant /sync streams.

View file

@ -16,6 +16,7 @@ package sync
import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"
@ -29,9 +30,9 @@ import (
)
var (
randomMessageEvent gomatrixserverlib.Event
aliceInviteBobEvent gomatrixserverlib.Event
bobLeaveEvent gomatrixserverlib.Event
randomMessageEvent gomatrixserverlib.HeaderedEvent
aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
bobLeaveEvent gomatrixserverlib.HeaderedEvent
syncPositionVeryOld types.PaginationToken
syncPositionBefore types.PaginationToken
syncPositionAfter types.PaginationToken
@ -67,7 +68,8 @@ func init() {
syncPositionAfter2.PDUPosition = 13
var err error
randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
err = json.Unmarshal([]byte(`{
"_room_version": "1",
"type": "m.room.message",
"content": {
"body": "Hello World",
@ -75,13 +77,15 @@ func init() {
},
"sender": "@noone:localhost",
"room_id": "`+roomID+`",
"origin": "localhost",
"origin_server_ts": 12345,
"event_id": "$randomMessageEvent:localhost"
}`), false)
}`), &randomMessageEvent)
if err != nil {
panic(err)
}
aliceInviteBobEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
err = json.Unmarshal([]byte(`{
"_room_version": "1",
"type": "m.room.member",
"state_key": "`+bob+`",
"content": {
@ -89,13 +93,15 @@ func init() {
},
"sender": "`+alice+`",
"room_id": "`+roomID+`",
"origin": "localhost",
"origin_server_ts": 12345,
"event_id": "$aliceInviteBobEvent:localhost"
}`), false)
}`), &aliceInviteBobEvent)
if err != nil {
panic(err)
}
bobLeaveEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
err = json.Unmarshal([]byte(`{
"_room_version": "1",
"type": "m.room.member",
"state_key": "`+bob+`",
"content": {
@ -103,9 +109,10 @@ func init() {
},
"sender": "`+bob+`",
"room_id": "`+roomID+`",
"origin": "localhost",
"origin_server_ts": 12345,
"event_id": "$bobLeaveEvent:localhost"
}`), false)
}`), &bobLeaveEvent)
if err != nil {
panic(err)
}

View file

@ -40,7 +40,7 @@ type StreamPosition int64
// Same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type StreamEvent struct {
gomatrixserverlib.Event
gomatrixserverlib.HeaderedEvent
StreamPosition StreamPosition
TransactionID *api.TransactionID
ExcludeFromSync bool