mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-16 19:33:09 -06:00
Resolve merge conflicts
This commit is contained in:
commit
183fb3973d
2
go.mod
2
go.mod
|
|
@ -18,8 +18,6 @@ require (
|
||||||
github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0
|
github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0
|
||||||
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5
|
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5
|
||||||
github.com/miekg/dns v1.1.12 // indirect
|
github.com/miekg/dns v1.1.12 // indirect
|
||||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
|
||||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
|
||||||
github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5
|
github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5
|
||||||
github.com/opentracing/opentracing-go v1.0.2
|
github.com/opentracing/opentracing-go v1.0.2
|
||||||
github.com/pierrec/lz4 v0.0.0-20161206202305-5c9560bfa9ac // indirect
|
github.com/pierrec/lz4 v0.0.0-20161206202305-5c9560bfa9ac // indirect
|
||||||
|
|
|
||||||
1
go.sum
1
go.sum
|
|
@ -183,6 +183,7 @@ gopkg.in/h2non/bimg.v1 v1.0.18 h1:qn6/RpBHt+7WQqoBcK+aF2puc6nC78eZj5LexxoalT4=
|
||||||
gopkg.in/h2non/bimg.v1 v1.0.18/go.mod h1:PgsZL7dLwUbsGm1NYps320GxGgvQNTnecMCZqxV11So=
|
gopkg.in/h2non/bimg.v1 v1.0.18/go.mod h1:PgsZL7dLwUbsGm1NYps320GxGgvQNTnecMCZqxV11So=
|
||||||
gopkg.in/h2non/gock.v1 v1.0.14 h1:fTeu9fcUvSnLNacYvYI54h+1/XEteDyHvrVCZEEEYNM=
|
gopkg.in/h2non/gock.v1 v1.0.14 h1:fTeu9fcUvSnLNacYvYI54h+1/XEteDyHvrVCZEEEYNM=
|
||||||
gopkg.in/h2non/gock.v1 v1.0.14/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE=
|
gopkg.in/h2non/gock.v1 v1.0.14/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE=
|
||||||
|
gopkg.in/macaroon.v2 v2.1.0 h1:HZcsjBCzq9t0eBPMKqTN/uSN6JOm78ZJ2INbqcBQOUI=
|
||||||
gopkg.in/macaroon.v2 v2.1.0/go.mod h1:OUb+TQP/OP0WOerC2Jp/3CwhIKyIa9kQjuc7H24e6/o=
|
gopkg.in/macaroon.v2 v2.1.0/go.mod h1:OUb+TQP/OP0WOerC2Jp/3CwhIKyIa9kQjuc7H24e6/o=
|
||||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||||
|
|
|
||||||
123
syncapi/api/query.go
Normal file
123
syncapi/api/query.go
Normal file
|
|
@ -0,0 +1,123 @@
|
||||||
|
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
commonHTTP "github.com/matrix-org/dendrite/common/http"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
opentracing "github.com/opentracing/opentracing-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
SyncAPIQuerySyncPath = "/api/syncapi/querySync"
|
||||||
|
SyncAPIQueryStatePath = "/api/syncapi/queryState"
|
||||||
|
SyncAPIQueryStateTypePath = "/api/syncapi/queryStateType"
|
||||||
|
SyncAPIQueryMessagesPath = "/api/syncapi/queryMessages"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSyncQueryAPIHTTP(syncapiURL string, httpClient *http.Client) SyncQueryAPI {
|
||||||
|
if httpClient == nil {
|
||||||
|
httpClient = http.DefaultClient
|
||||||
|
}
|
||||||
|
return &httpSyncQueryAPI{syncapiURL, httpClient}
|
||||||
|
}
|
||||||
|
|
||||||
|
type httpSyncQueryAPI struct {
|
||||||
|
syncapiURL string
|
||||||
|
httpClient *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
type SyncQueryAPI interface {
|
||||||
|
QuerySync(ctx context.Context, request *QuerySyncRequest, response *QuerySyncResponse) error
|
||||||
|
QueryState(ctx context.Context, request *QueryStateRequest, response *QueryStateResponse) error
|
||||||
|
QueryStateType(ctx context.Context, request *QueryStateTypeRequest, response *QueryStateTypeResponse) error
|
||||||
|
QueryMessages(ctx context.Context, request *QueryMessagesRequest, response *QueryMessagesResponse) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type QuerySyncRequest struct{}
|
||||||
|
|
||||||
|
type QueryStateRequest struct {
|
||||||
|
RoomID string
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueryStateTypeRequest struct {
|
||||||
|
RoomID string
|
||||||
|
EventType string
|
||||||
|
StateKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueryMessagesRequest struct {
|
||||||
|
RoomID string
|
||||||
|
}
|
||||||
|
|
||||||
|
type QuerySyncResponse util.JSONResponse
|
||||||
|
type QueryStateResponse util.JSONResponse
|
||||||
|
type QueryStateTypeResponse util.JSONResponse
|
||||||
|
type QueryMessagesResponse util.JSONResponse
|
||||||
|
|
||||||
|
// QueryLatestEventsAndState implements SyncQueryAPI
|
||||||
|
func (h *httpSyncQueryAPI) QuerySync(
|
||||||
|
ctx context.Context,
|
||||||
|
request *QuerySyncRequest,
|
||||||
|
response *QuerySyncResponse,
|
||||||
|
) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QuerySync")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.syncapiURL + SyncAPIQuerySyncPath
|
||||||
|
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryStateAfterEvents implements SyncQueryAPI
|
||||||
|
func (h *httpSyncQueryAPI) QueryState(
|
||||||
|
ctx context.Context,
|
||||||
|
request *QueryStateRequest,
|
||||||
|
response *QueryStateResponse,
|
||||||
|
) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryState")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.syncapiURL + SyncAPIQueryStatePath
|
||||||
|
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryEventsByID implements SyncQueryAPI
|
||||||
|
func (h *httpSyncQueryAPI) QueryStateType(
|
||||||
|
ctx context.Context,
|
||||||
|
request *QueryStateTypeRequest,
|
||||||
|
response *QueryStateTypeResponse,
|
||||||
|
) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryStateType")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.syncapiURL + SyncAPIQueryStateTypePath
|
||||||
|
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryMembershipForUser implements SyncQueryAPI
|
||||||
|
func (h *httpSyncQueryAPI) QueryMessages(
|
||||||
|
ctx context.Context,
|
||||||
|
request *QueryMessagesRequest,
|
||||||
|
response *QueryMessagesResponse,
|
||||||
|
) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryMessages")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.syncapiURL + SyncAPIQueryMessagesPath
|
||||||
|
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
|
}
|
||||||
44
syncapi/query/query.go
Normal file
44
syncapi/query/query.go
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package query
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyncQueryAPI struct {
|
||||||
|
requestPool sync.RequestPool
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetupHTTP adds the RoomserverQueryAPI handlers to the http.ServeMux.
|
||||||
|
// nolint: gocyclo
|
||||||
|
func (s *SyncQueryAPI) SetupHTTP(servMux *http.ServeMux) {
|
||||||
|
servMux.Handle(
|
||||||
|
api.RoomserverQueryLatestEventsAndStatePath,
|
||||||
|
common.MakeInternalAPI("querySync", func(req *http.Request) util.JSONResponse {
|
||||||
|
var request api.QuerySyncRequest
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
return s.OnIncomingSyncRequest()
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
@ -22,7 +22,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrix"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
@ -45,10 +44,10 @@ func OnIncomingStateRequest(req *http.Request, db storage.Database, roomID strin
|
||||||
// TODO(#287): Auth request and handle the case where the user has left (where
|
// TODO(#287): Auth request and handle the case where the user has left (where
|
||||||
// we should return the state at the poin they left)
|
// we should return the state at the poin they left)
|
||||||
|
|
||||||
stateFilterPart := gomatrix.DefaultFilterPart()
|
stateFilter := gomatrixserverlib.DefaultStateFilter()
|
||||||
// TODO: stateFilterPart should not limit the number of state events (or only limits abusive number of events)
|
// TODO: stateFilter should not limit the number of state events (or only limits abusive number of events)
|
||||||
|
|
||||||
stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID, &stateFilterPart)
|
stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID, &stateFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ import (
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrix"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
const accountDataSchema = `
|
const accountDataSchema = `
|
||||||
|
|
@ -99,7 +99,7 @@ func (s *accountDataStatements) selectAccountDataInRange(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
userID string,
|
userID string,
|
||||||
oldPos, newPos types.StreamPosition,
|
oldPos, newPos types.StreamPosition,
|
||||||
accountDataFilterPart *gomatrix.FilterPart,
|
accountDataEventFilter *gomatrixserverlib.EventFilter,
|
||||||
) (data map[string][]string, err error) {
|
) (data map[string][]string, err error) {
|
||||||
data = make(map[string][]string)
|
data = make(map[string][]string)
|
||||||
|
|
||||||
|
|
@ -111,9 +111,9 @@ func (s *accountDataStatements) selectAccountDataInRange(
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos,
|
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos,
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.Types)),
|
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.Types)),
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.NotTypes)),
|
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.NotTypes)),
|
||||||
accountDataFilterPart.Limit,
|
accountDataEventFilter.Limit,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ import (
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrix"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -185,16 +184,16 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
|
||||||
// CurrentState returns all the current state events for the given room.
|
// CurrentState returns all the current state events for the given room.
|
||||||
func (s *currentRoomStateStatements) selectCurrentState(
|
func (s *currentRoomStateStatements) selectCurrentState(
|
||||||
ctx context.Context, txn *sql.Tx, roomID string,
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
stateFilterPart *gomatrix.FilterPart,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
) ([]gomatrixserverlib.Event, error) {
|
) ([]gomatrixserverlib.Event, error) {
|
||||||
stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
|
stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
|
||||||
rows, err := stmt.QueryContext(ctx, roomID,
|
rows, err := stmt.QueryContext(ctx, roomID,
|
||||||
pq.StringArray(stateFilterPart.Senders),
|
pq.StringArray(stateFilter.Senders),
|
||||||
pq.StringArray(stateFilterPart.NotSenders),
|
pq.StringArray(stateFilter.NotSenders),
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
|
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
|
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
|
||||||
stateFilterPart.ContainsURL,
|
stateFilter.ContainsURL,
|
||||||
stateFilterPart.Limit,
|
stateFilter.Limit,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrix"
|
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
|
@ -154,18 +153,18 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||||
// two positions, only the most recent state is returned.
|
// two positions, only the most recent state is returned.
|
||||||
func (s *outputRoomEventsStatements) selectStateInRange(
|
func (s *outputRoomEventsStatements) selectStateInRange(
|
||||||
ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
|
ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
|
||||||
stateFilterPart *gomatrix.FilterPart,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
|
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
|
||||||
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
|
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
|
||||||
|
|
||||||
rows, err := stmt.QueryContext(
|
rows, err := stmt.QueryContext(
|
||||||
ctx, oldPos, newPos,
|
ctx, oldPos, newPos,
|
||||||
pq.StringArray(stateFilterPart.Senders),
|
pq.StringArray(stateFilter.Senders),
|
||||||
pq.StringArray(stateFilterPart.NotSenders),
|
pq.StringArray(stateFilter.NotSenders),
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
|
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
|
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
|
||||||
stateFilterPart.ContainsURL,
|
stateFilter.ContainsURL,
|
||||||
stateFilterPart.Limit,
|
stateFilter.Limit,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,6 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrix"
|
|
||||||
|
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
|
@ -237,10 +236,10 @@ func (d *SyncServerDatasource) GetStateEvent(
|
||||||
// Returns an empty slice if no state events could be found for this room.
|
// Returns an empty slice if no state events could be found for this room.
|
||||||
// Returns an error if there was an issue with the retrieval.
|
// Returns an error if there was an issue with the retrieval.
|
||||||
func (d *SyncServerDatasource) GetStateEventsForRoom(
|
func (d *SyncServerDatasource) GetStateEventsForRoom(
|
||||||
ctx context.Context, roomID string, stateFilterPart *gomatrix.FilterPart,
|
ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter,
|
||||||
) (stateEvents []gomatrixserverlib.Event, err error) {
|
) (stateEvents []gomatrixserverlib.Event, err error) {
|
||||||
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
|
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
|
|
@ -422,7 +421,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
|
||||||
var succeeded bool
|
var succeeded bool
|
||||||
defer common.EndTransaction(txn, &succeeded)
|
defer common.EndTransaction(txn, &succeeded)
|
||||||
|
|
||||||
stateFilterPart := gomatrix.DefaultFilterPart() // TODO: use filter provided in request
|
stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
|
||||||
|
|
||||||
// Work out which rooms to return in the response. This is done by getting not only the currently
|
// Work out which rooms to return in the response. This is done by getting not only the currently
|
||||||
// joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
|
// joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
|
||||||
|
|
@ -432,11 +431,11 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
|
||||||
var joinedRoomIDs []string
|
var joinedRoomIDs []string
|
||||||
if !wantFullState {
|
if !wantFullState {
|
||||||
deltas, joinedRoomIDs, err = d.getStateDeltas(
|
deltas, joinedRoomIDs, err = d.getStateDeltas(
|
||||||
ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart,
|
ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
|
deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
|
||||||
ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart,
|
ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -587,12 +586,12 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
stateFilterPart := gomatrix.DefaultFilterPart() // TODO: use filter provided in request
|
stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
|
||||||
|
|
||||||
// Build up a /sync response. Add joined rooms.
|
// Build up a /sync response. Add joined rooms.
|
||||||
for _, roomID := range joinedRoomIDs {
|
for _, roomID := range joinedRoomIDs {
|
||||||
var stateEvents []gomatrixserverlib.Event
|
var stateEvents []gomatrixserverlib.Event
|
||||||
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart)
|
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -681,7 +680,7 @@ var txReadOnlySnapshot = sql.TxOptions{
|
||||||
// If there was an issue with the retrieval, returns an error
|
// If there was an issue with the retrieval, returns an error
|
||||||
func (d *SyncServerDatasource) GetAccountDataInRange(
|
func (d *SyncServerDatasource) GetAccountDataInRange(
|
||||||
ctx context.Context, userID string, oldPos, newPos types.StreamPosition,
|
ctx context.Context, userID string, oldPos, newPos types.StreamPosition,
|
||||||
accountDataFilterPart *gomatrix.FilterPart,
|
accountDataFilterPart *gomatrixserverlib.EventFilter,
|
||||||
) (map[string][]string, error) {
|
) (map[string][]string, error) {
|
||||||
return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart)
|
return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart)
|
||||||
}
|
}
|
||||||
|
|
@ -931,7 +930,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents(
|
||||||
func (d *SyncServerDatasource) getStateDeltas(
|
func (d *SyncServerDatasource) getStateDeltas(
|
||||||
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
||||||
fromPos, toPos types.StreamPosition, userID string,
|
fromPos, toPos types.StreamPosition, userID string,
|
||||||
stateFilterPart *gomatrix.FilterPart,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
) ([]stateDelta, []string, error) {
|
) ([]stateDelta, []string, error) {
|
||||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||||
// - Get membership list changes for this user in this sync response
|
// - Get membership list changes for this user in this sync response
|
||||||
|
|
@ -944,7 +943,7 @@ func (d *SyncServerDatasource) getStateDeltas(
|
||||||
var deltas []stateDelta
|
var deltas []stateDelta
|
||||||
|
|
||||||
// get all the state events ever between these two positions
|
// get all the state events ever between these two positions
|
||||||
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
|
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -964,7 +963,7 @@ func (d *SyncServerDatasource) getStateDeltas(
|
||||||
if membership == gomatrixserverlib.Join {
|
if membership == gomatrixserverlib.Join {
|
||||||
// send full room state down instead of a delta
|
// send full room state down instead of a delta
|
||||||
var s []types.StreamEvent
|
var s []types.StreamEvent
|
||||||
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilterPart)
|
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -1006,7 +1005,7 @@ func (d *SyncServerDatasource) getStateDeltas(
|
||||||
func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
||||||
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
||||||
fromPos, toPos types.StreamPosition, userID string,
|
fromPos, toPos types.StreamPosition, userID string,
|
||||||
stateFilterPart *gomatrix.FilterPart,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
) ([]stateDelta, []string, error) {
|
) ([]stateDelta, []string, error) {
|
||||||
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
|
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -1018,7 +1017,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
||||||
|
|
||||||
// Add full states for all joined rooms
|
// Add full states for all joined rooms
|
||||||
for _, joinedRoomID := range joinedRoomIDs {
|
for _, joinedRoomID := range joinedRoomIDs {
|
||||||
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilterPart)
|
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilter)
|
||||||
if stateErr != nil {
|
if stateErr != nil {
|
||||||
return nil, nil, stateErr
|
return nil, nil, stateErr
|
||||||
}
|
}
|
||||||
|
|
@ -1030,7 +1029,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get all the state events ever between these two positions
|
// Get all the state events ever between these two positions
|
||||||
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
|
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -1061,9 +1060,9 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
||||||
|
|
||||||
func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
|
func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
|
||||||
ctx context.Context, txn *sql.Tx, roomID string,
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
stateFilterPart *gomatrix.FilterPart,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
) ([]types.StreamEvent, error) {
|
) ([]types.StreamEvent, error) {
|
||||||
allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
|
allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
|
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/dendrite/typingserver/cache"
|
"github.com/matrix-org/dendrite/typingserver/cache"
|
||||||
"github.com/matrix-org/gomatrix"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -35,11 +34,11 @@ type Database interface {
|
||||||
Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error)
|
Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error)
|
||||||
WriteEvent(context.Context, *gomatrixserverlib.Event, []gomatrixserverlib.Event, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error)
|
WriteEvent(context.Context, *gomatrixserverlib.Event, []gomatrixserverlib.Event, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error)
|
||||||
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error)
|
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error)
|
||||||
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrix.FilterPart) (stateEvents []gomatrixserverlib.Event, err error)
|
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.Event, err error)
|
||||||
SyncPosition(ctx context.Context) (types.PaginationToken, error)
|
SyncPosition(ctx context.Context) (types.PaginationToken, error)
|
||||||
IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error)
|
IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error)
|
||||||
CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error)
|
CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error)
|
||||||
GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrix.FilterPart) (map[string][]string, error)
|
GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error)
|
||||||
UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error)
|
UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error)
|
||||||
AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (types.StreamPosition, error)
|
AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (types.StreamPosition, error)
|
||||||
RetireInviteEvent(ctx context.Context, inviteEventID string) error
|
RetireInviteEvent(ctx context.Context, inviteEventID string) error
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrix"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
@ -142,14 +141,14 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Pagin
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
accountDataFilter := gomatrix.DefaultFilterPart() // TODO: use filter provided in req instead
|
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
|
||||||
res, err = rp.appendAccountData(res, req.device.UserID, req, int64(latestPos.PDUPosition), &accountDataFilter)
|
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition, &accountDataFilter)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) appendAccountData(
|
func (rp *RequestPool) appendAccountData(
|
||||||
data *types.Response, userID string, req syncRequest, currentPos int64,
|
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
|
||||||
accountDataFilter *gomatrix.FilterPart,
|
accountDataFilter *gomatrixserverlib.EventFilter,
|
||||||
) (*types.Response, error) {
|
) (*types.Response, error) {
|
||||||
// TODO: Account data doesn't have a sync position of its own, meaning that
|
// TODO: Account data doesn't have a sync position of its own, meaning that
|
||||||
// account data might be sent multiple time to the client if multiple account
|
// account data might be sent multiple time to the client if multiple account
|
||||||
|
|
|
||||||
|
|
@ -81,6 +81,7 @@ Can't forget room you're still in
|
||||||
Can get rooms/{roomId}/members
|
Can get rooms/{roomId}/members
|
||||||
Can create filter
|
Can create filter
|
||||||
Can download filter
|
Can download filter
|
||||||
|
Lazy loading parameters in the filter are strictly boolean
|
||||||
Can sync
|
Can sync
|
||||||
Can sync a joined room
|
Can sync a joined room
|
||||||
Newly joined room is included in an incremental sync
|
Newly joined room is included in an incremental sync
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue