Remove QueryBulkStateContent from current state server

Expected fail due to db impl not existing
This commit is contained in:
Kegan Dougal 2020-09-07 10:23:57 +01:00
parent b9caccbce8
commit 82cf4614d0
14 changed files with 34 additions and 235 deletions

View file

@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -51,7 +50,7 @@ type filter struct {
// GetPostPublicRooms implements GET and POST /publicRooms // GetPostPublicRooms implements GET and POST /publicRooms
func GetPostPublicRooms( func GetPostPublicRooms(
req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider, extRoomsProvider api.ExtraPublicRoomsProvider,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
cfg *config.ClientAPI, cfg *config.ClientAPI,
@ -75,7 +74,7 @@ func GetPostPublicRooms(
} }
} }
response, err := publicRooms(req.Context(), request, rsAPI, stateAPI, extRoomsProvider) response, err := publicRooms(req.Context(), request, rsAPI, extRoomsProvider)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Errorf("failed to work out public rooms") util.GetLogger(req.Context()).WithError(err).Errorf("failed to work out public rooms")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
@ -86,8 +85,8 @@ func GetPostPublicRooms(
} }
} }
func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, func publicRooms(
stateAPI currentstateAPI.CurrentStateInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
) (*gomatrixserverlib.RespPublicRooms, error) { ) (*gomatrixserverlib.RespPublicRooms, error) {
response := gomatrixserverlib.RespPublicRooms{ response := gomatrixserverlib.RespPublicRooms{
@ -110,7 +109,7 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI
var rooms []gomatrixserverlib.PublicRoom var rooms []gomatrixserverlib.PublicRoom
if request.Since == "" { if request.Since == "" {
rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider, stateAPI) rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider)
} else { } else {
rooms = getPublicRoomsFromCache() rooms = getPublicRoomsFromCache()
} }
@ -226,7 +225,6 @@ func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (
func refreshPublicRoomCache( func refreshPublicRoomCache(
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
stateAPI currentstateAPI.CurrentStateInternalAPI,
) []gomatrixserverlib.PublicRoom { ) []gomatrixserverlib.PublicRoom {
cacheMu.Lock() cacheMu.Lock()
defer cacheMu.Unlock() defer cacheMu.Unlock()
@ -241,7 +239,7 @@ func refreshPublicRoomCache(
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return publicRoomsCache return publicRoomsCache
} }
pubRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, stateAPI) pubRooms, err := roomserverAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, rsAPI)
if err != nil { if err != nil {
util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed") util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed")
return publicRoomsCache return publicRoomsCache

View file

@ -336,7 +336,7 @@ func Setup(
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/publicRooms", r0mux.Handle("/publicRooms",
httputil.MakeExternalAPI("public_rooms", func(req *http.Request) util.JSONResponse { httputil.MakeExternalAPI("public_rooms", func(req *http.Request) util.JSONResponse {
return GetPostPublicRooms(req, rsAPI, stateAPI, extRoomsProvider, federation, cfg) return GetPostPublicRooms(req, rsAPI, extRoomsProvider, federation, cfg)
}), }),
).Methods(http.MethodGet, http.MethodPost, http.MethodOptions) ).Methods(http.MethodGet, http.MethodPost, http.MethodOptions)

View file

@ -106,7 +106,7 @@ func (p *publicRoomsProvider) AdvertiseRooms() error {
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return err return err
} }
ourRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.stateAPI) ourRooms, err := roomserverAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.rsAPI)
if err != nil { if err != nil {
util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed") util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed")
return err return err

View file

@ -14,37 +14,5 @@
package api package api
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
)
type CurrentStateInternalAPI interface { type CurrentStateInternalAPI interface {
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
}
type QueryBulkStateContentRequest struct {
// Returns state events in these rooms
RoomIDs []string
// If true, treats the '*' StateKey as "all state events of this type" rather than a literal value of '*'
AllowWildcards bool
// The state events to return. Only a small subset of tuples are allowed in this request as only certain events
// have their content fields extracted. Specifically, the tuple Type must be one of:
// m.room.avatar
// m.room.create
// m.room.canonical_alias
// m.room.guest_access
// m.room.history_visibility
// m.room.join_rules
// m.room.member
// m.room.name
// m.room.topic
// Any other tuple type will result in the query failing.
StateTuples []gomatrixserverlib.StateKeyTuple
}
type QueryBulkStateContentResponse struct {
// map of room ID -> tuple -> content_value
Rooms map[string]map[gomatrixserverlib.StateKeyTuple]string
} }

View file

@ -1,89 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// PopulatePublicRooms extracts PublicRoom information for all the provided room IDs. The IDs are not checked to see if they are visible in the
// published room directory.
// due to lots of switches
// nolint:gocyclo
func PopulatePublicRooms(ctx context.Context, roomIDs []string, stateAPI CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}
canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""}
topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""}
guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""}
visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}
joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}
var stateRes QueryBulkStateContentResponse
err := stateAPI.QueryBulkStateContent(ctx, &QueryBulkStateContentRequest{
RoomIDs: roomIDs,
AllowWildcards: true,
StateTuples: []gomatrixserverlib.StateKeyTuple{
nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple,
{EventType: gomatrixserverlib.MRoomMember, StateKey: "*"},
},
}, &stateRes)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed")
return nil, err
}
chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs))
i := 0
for roomID, data := range stateRes.Rooms {
pub := gomatrixserverlib.PublicRoom{
RoomID: roomID,
}
joinCount := 0
var joinRule, guestAccess string
for tuple, contentVal := range data {
if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" {
joinCount++
continue
}
switch tuple {
case avatarTuple:
pub.AvatarURL = contentVal
case nameTuple:
pub.Name = contentVal
case topicTuple:
pub.Topic = contentVal
case canonicalTuple:
pub.CanonicalAlias = contentVal
case visibilityTuple:
pub.WorldReadable = contentVal == "world_readable"
// need both of these to determine whether guests can join
case joinRuleTuple:
joinRule = contentVal
case guestTuple:
guestAccess = contentVal
}
}
if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" {
pub.GuestCanJoin = true
}
pub.JoinedMembersCount = joinCount
chunk[i] = pub
i++
}
return chunk, nil
}

View file

@ -15,33 +15,9 @@
package internal package internal
import ( import (
"context"
"github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/currentstateserver/storage" "github.com/matrix-org/dendrite/currentstateserver/storage"
"github.com/matrix-org/gomatrixserverlib"
) )
type CurrentStateInternalAPI struct { type CurrentStateInternalAPI struct {
DB storage.Database DB storage.Database
} }
func (a *CurrentStateInternalAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error {
events, err := a.DB.GetBulkStateContent(ctx, req.RoomIDs, req.StateTuples, req.AllowWildcards)
if err != nil {
return err
}
res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
for _, ev := range events {
if res.Rooms[ev.RoomID] == nil {
res.Rooms[ev.RoomID] = make(map[gomatrixserverlib.StateKeyTuple]string)
}
room := res.Rooms[ev.RoomID]
room[gomatrixserverlib.StateKeyTuple{
EventType: ev.EventType,
StateKey: ev.StateKey,
}] = ev.ContentValue
res.Rooms[ev.RoomID] = room
}
return nil
}

View file

@ -15,13 +15,10 @@
package inthttp package inthttp
import ( import (
"context"
"errors" "errors"
"net/http" "net/http"
"github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/opentracing/opentracing-go"
) )
// HTTP paths for the internal HTTP APIs // HTTP paths for the internal HTTP APIs
@ -49,15 +46,3 @@ type httpCurrentStateInternalAPI struct {
apiURL string apiURL string
httpClient *http.Client httpClient *http.Client
} }
func (h *httpCurrentStateInternalAPI) QueryBulkStateContent(
ctx context.Context,
request *api.QueryBulkStateContentRequest,
response *api.QueryBulkStateContentResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBulkStateContent")
defer span.Finish()
apiURL := h.apiURL + QueryBulkStateContentPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -15,27 +15,10 @@
package inthttp package inthttp
import ( import (
"encoding/json"
"net/http"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/util"
) )
func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) { func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) {
internalAPIMux.Handle(QueryBulkStateContentPath,
httputil.MakeInternalAPI("queryBulkStateContent", func(req *http.Request) util.JSONResponse {
request := api.QueryBulkStateContentRequest{}
response := api.QueryBulkStateContentResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := intAPI.QueryBulkStateContent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
} }

View file

@ -7,7 +7,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -24,7 +23,7 @@ type filter struct {
} }
// GetPostPublicRooms implements GET and POST /publicRooms // GetPostPublicRooms implements GET and POST /publicRooms
func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) util.JSONResponse { func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI) util.JSONResponse {
var request PublicRoomReq var request PublicRoomReq
if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil { if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil {
return *fillErr return *fillErr
@ -32,7 +31,7 @@ func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInterna
if request.Limit == 0 { if request.Limit == 0 {
request.Limit = 50 request.Limit = 50
} }
response, err := publicRooms(req.Context(), request, rsAPI, stateAPI) response, err := publicRooms(req.Context(), request, rsAPI)
if err != nil { if err != nil {
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
@ -42,8 +41,9 @@ func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInterna
} }
} }
func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, func publicRooms(
stateAPI currentstateAPI.CurrentStateInternalAPI) (*gomatrixserverlib.RespPublicRooms, error) { ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
) (*gomatrixserverlib.RespPublicRooms, error) {
var response gomatrixserverlib.RespPublicRooms var response gomatrixserverlib.RespPublicRooms
var limit int16 var limit int16
@ -80,7 +80,7 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI
nextIndex = len(queryRes.RoomIDs) nextIndex = len(queryRes.RoomIDs)
} }
roomIDs := queryRes.RoomIDs[offset:nextIndex] roomIDs := queryRes.RoomIDs[offset:nextIndex]
response.Chunk, err = fillInRooms(ctx, roomIDs, stateAPI) response.Chunk, err = fillInRooms(ctx, roomIDs, rsAPI)
return &response, err return &response, err
} }
@ -112,7 +112,7 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO
// due to lots of switches // due to lots of switches
// nolint:gocyclo // nolint:gocyclo
func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { func fillInRooms(ctx context.Context, roomIDs []string, rsAPI roomserverAPI.RoomserverInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}
canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""} canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""}
@ -121,8 +121,8 @@ func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI
visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""} visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}
joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""} joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}
var stateRes currentstateAPI.QueryBulkStateContentResponse var stateRes roomserverAPI.QueryBulkStateContentResponse
err := stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{ err := rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
RoomIDs: roomIDs, RoomIDs: roomIDs,
AllowWildcards: true, AllowWildcards: true,
StateTuples: []gomatrixserverlib.StateKeyTuple{ StateTuples: []gomatrixserverlib.StateKeyTuple{

View file

@ -390,7 +390,7 @@ func Setup(
v1fedmux.Handle("/publicRooms", v1fedmux.Handle("/publicRooms",
httputil.MakeExternalAPI("federation_public_rooms", func(req *http.Request) util.JSONResponse { httputil.MakeExternalAPI("federation_public_rooms", func(req *http.Request) util.JSONResponse {
return GetPostPublicRooms(req, rsAPI, stateAPI) return GetPostPublicRooms(req, rsAPI)
}), }),
).Methods(http.MethodGet) ).Methods(http.MethodGet)

View file

@ -133,7 +133,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are now sharing rooms with which we previously were not and notify them about the joining // work out who we are now sharing rooms with which we previously were not and notify them about the joining
// users keys: // users keys:
changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
if err != nil { if err != nil {
log.WithError(err).Error("OnJoinEvent: failed to work out changed users") log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
return return
@ -146,7 +146,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are no longer sharing any rooms with and notify them about the leaving user // work out who we are no longer sharing any rooms with and notify them about the leaving user
_, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
if err != nil { if err != nil {
log.WithError(err).Error("OnLeaveEvent: failed to work out left users") log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
return return

View file

@ -19,7 +19,6 @@ import (
"strings" "strings"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -50,7 +49,6 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// nolint:gocyclo // nolint:gocyclo
func DeviceListCatchup( func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
userID string, res *types.Response, from, to types.StreamingToken, userID string, res *types.Response, from, to types.StreamingToken,
) (hasNew bool, err error) { ) (hasNew bool, err error) {
@ -58,7 +56,7 @@ func DeviceListCatchup(
newlyJoinedRooms := joinedRooms(res, userID) newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res) newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := TrackChangedUsers(ctx, rsAPI, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms) changed, left, err := TrackChangedUsers(ctx, rsAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -144,7 +142,7 @@ func DeviceListCatchup(
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. // TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
// nolint:gocyclo // nolint:gocyclo
func TrackChangedUsers( func TrackChangedUsers(
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
) (changed, left []string, err error) { ) (changed, left []string, err error) {
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
@ -161,8 +159,8 @@ func TrackChangedUsers(
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
var stateRes currentstateAPI.QueryBulkStateContentResponse var stateRes roomserverAPI.QueryBulkStateContentResponse
err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{ err = rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
RoomIDs: newlyLeftRooms, RoomIDs: newlyLeftRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{ StateTuples: []gomatrixserverlib.StateKeyTuple{
{ {
@ -202,7 +200,7 @@ func TrackChangedUsers(
if err != nil { if err != nil {
return nil, left, err return nil, left, err
} }
err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{ err = rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
RoomIDs: newlyJoinedRooms, RoomIDs: newlyJoinedRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{ StateTuples: []gomatrixserverlib.StateKeyTuple{
{ {

View file

@ -7,7 +7,6 @@ import (
"testing" "testing"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
@ -108,25 +107,6 @@ func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.Query
return nil return nil
} }
type mockStateAPI struct {
rsAPI *mockRoomserverAPI
}
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error {
var res2 api.QueryBulkStateContentResponse
err := s.rsAPI.QueryBulkStateContent(ctx, &api.QueryBulkStateContentRequest{
RoomIDs: req.RoomIDs,
AllowWildcards: req.AllowWildcards,
StateTuples: req.StateTuples,
}, &res2)
if err != nil {
return err
}
res.Rooms = res2.Rooms
return nil
}
type wantCatchup struct { type wantCatchup struct {
hasNew bool hasNew bool
changed []string changed []string
@ -200,7 +180,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
} }
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil { if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
@ -223,7 +203,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
} }
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil { if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
@ -246,7 +226,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser},
}, },
} }
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("Catchup returned an error: %s", err)
} }
@ -268,7 +248,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser},
}, },
} }
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil { if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
@ -327,7 +307,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
roomID: {syncingUser, existingUser}, roomID: {syncingUser, existingUser},
}, },
} }
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil { if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
@ -355,7 +335,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
} }
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("Catchup returned an error: %s", err)
} }
@ -441,7 +421,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
}, },
} }
hasNew, err := DeviceListCatchup( hasNew, err := DeviceListCatchup(
context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken, context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken,
) )
if err != nil { if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)

View file

@ -267,7 +267,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
func (rp *RequestPool) appendDeviceLists( func (rp *RequestPool) appendDeviceLists(
data *types.Response, userID string, since, to types.StreamingToken, data *types.Response, userID string, since, to types.StreamingToken,
) (*types.Response, error) { ) (*types.Response, error) {
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, rp.stateAPI, userID, data, since, to) _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to)
if err != nil { if err != nil {
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err) return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
} }