Add Queryer and use embedded structs

This commit is contained in:
Kegan Dougal 2020-09-02 15:13:43 +01:00
parent e473320e73
commit a6a704ee26
8 changed files with 108 additions and 51 deletions

View file

@ -10,12 +10,19 @@ import (
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/perform" "github.com/matrix-org/dendrite/roomserver/internal/perform"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
// RoomserverInternalAPI is an implementation of api.RoomserverInternalAPI // RoomserverInternalAPI is an implementation of api.RoomserverInternalAPI
type RoomserverInternalAPI struct { type RoomserverInternalAPI struct {
*query.Queryer
*perform.Inviter
*perform.Joiner
*perform.Leaver
*perform.Publisher
*perform.Backfiller
DB storage.Database DB storage.Database
Cfg *config.RoomServer Cfg *config.RoomServer
Producer sarama.SyncProducer Producer sarama.SyncProducer
@ -24,11 +31,7 @@ type RoomserverInternalAPI struct {
KeyRing gomatrixserverlib.JSONVerifier KeyRing gomatrixserverlib.JSONVerifier
FedClient *gomatrixserverlib.FederationClient FedClient *gomatrixserverlib.FederationClient
OutputRoomEventTopic string // Kafka topic for new output room events OutputRoomEventTopic string // Kafka topic for new output room events
Inviter *perform.Inviter
Joiner *perform.Joiner
Leaver *perform.Leaver
Publisher *perform.Publisher
Backfiller *perform.Backfiller
mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent
fsAPI fsAPI.FederationSenderInternalAPI fsAPI fsAPI.FederationSenderInternalAPI
} }
@ -47,6 +50,10 @@ func NewRoomserverAPI(
KeyRing: keyRing, KeyRing: keyRing,
FedClient: fedClient, FedClient: fedClient,
OutputRoomEventTopic: outputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic,
Queryer: &query.Queryer{
DB: roomserverDB,
Cache: caches,
},
// perform-er structs get initialised when we have a federation sender to use // perform-er structs get initialised when we have a federation sender to use
} }
return a return a
@ -103,14 +110,6 @@ func (r *RoomserverInternalAPI) PerformInvite(
return r.WriteOutputEvents(req.Event.RoomID(), outputEvents) return r.WriteOutputEvents(req.Event.RoomID(), outputEvents)
} }
func (r *RoomserverInternalAPI) PerformJoin(
ctx context.Context,
req *api.PerformJoinRequest,
res *api.PerformJoinResponse,
) {
r.Joiner.PerformJoin(ctx, req, res)
}
func (r *RoomserverInternalAPI) PerformLeave( func (r *RoomserverInternalAPI) PerformLeave(
ctx context.Context, ctx context.Context,
req *api.PerformLeaveRequest, req *api.PerformLeaveRequest,
@ -125,20 +124,3 @@ func (r *RoomserverInternalAPI) PerformLeave(
} }
return r.WriteOutputEvents(req.RoomID, outputEvents) return r.WriteOutputEvents(req.RoomID, outputEvents)
} }
func (r *RoomserverInternalAPI) PerformPublish(
ctx context.Context,
req *api.PerformPublishRequest,
res *api.PerformPublishResponse,
) {
r.Publisher.PerformPublish(ctx, req, res)
}
// Query a given amount (or less) of events prior to a given set of events.
func (r *RoomserverInternalAPI) PerformBackfill(
ctx context.Context,
request *api.PerformBackfillRequest,
response *api.PerformBackfillResponse,
) error {
return r.Backfiller.PerformBackfill(ctx, request, response)
}

View file

@ -1,3 +1,17 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package perform package perform
import ( import (

View file

@ -1,3 +1,17 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package perform package perform
import ( import (

View file

@ -1,3 +1,17 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package perform package perform
import ( import (

View file

@ -1,3 +1,17 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package perform package perform
import ( import (

View file

@ -1,3 +1,17 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package perform package perform
import ( import (

View file

@ -1,6 +1,4 @@
// Copyright 2017 Vector Creations Ltd // Copyright 2020 The Matrix.org Foundation C.I.C.
// Copyright 2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -14,15 +12,17 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package internal package query
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -30,8 +30,13 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
type Queryer struct {
DB storage.Database
Cache caching.RoomServerCaches
}
// QueryLatestEventsAndState implements api.RoomserverInternalAPI // QueryLatestEventsAndState implements api.RoomserverInternalAPI
func (r *RoomserverInternalAPI) QueryLatestEventsAndState( func (r *Queryer) QueryLatestEventsAndState(
ctx context.Context, ctx context.Context,
request *api.QueryLatestEventsAndStateRequest, request *api.QueryLatestEventsAndStateRequest,
response *api.QueryLatestEventsAndStateResponse, response *api.QueryLatestEventsAndStateResponse,
@ -85,7 +90,7 @@ func (r *RoomserverInternalAPI) QueryLatestEventsAndState(
} }
// QueryStateAfterEvents implements api.RoomserverInternalAPI // QueryStateAfterEvents implements api.RoomserverInternalAPI
func (r *RoomserverInternalAPI) QueryStateAfterEvents( func (r *Queryer) QueryStateAfterEvents(
ctx context.Context, ctx context.Context,
request *api.QueryStateAfterEventsRequest, request *api.QueryStateAfterEventsRequest,
response *api.QueryStateAfterEventsResponse, response *api.QueryStateAfterEventsResponse,
@ -134,7 +139,7 @@ func (r *RoomserverInternalAPI) QueryStateAfterEvents(
} }
// QueryEventsByID implements api.RoomserverInternalAPI // QueryEventsByID implements api.RoomserverInternalAPI
func (r *RoomserverInternalAPI) QueryEventsByID( func (r *Queryer) QueryEventsByID(
ctx context.Context, ctx context.Context,
request *api.QueryEventsByIDRequest, request *api.QueryEventsByIDRequest,
response *api.QueryEventsByIDResponse, response *api.QueryEventsByIDResponse,
@ -167,7 +172,7 @@ func (r *RoomserverInternalAPI) QueryEventsByID(
} }
// QueryMembershipForUser implements api.RoomserverInternalAPI // QueryMembershipForUser implements api.RoomserverInternalAPI
func (r *RoomserverInternalAPI) QueryMembershipForUser( func (r *Queryer) QueryMembershipForUser(
ctx context.Context, ctx context.Context,
request *api.QueryMembershipForUserRequest, request *api.QueryMembershipForUserRequest,
response *api.QueryMembershipForUserResponse, response *api.QueryMembershipForUserResponse,
@ -204,7 +209,7 @@ func (r *RoomserverInternalAPI) QueryMembershipForUser(
} }
// QueryMembershipsForRoom implements api.RoomserverInternalAPI // QueryMembershipsForRoom implements api.RoomserverInternalAPI
func (r *RoomserverInternalAPI) QueryMembershipsForRoom( func (r *Queryer) QueryMembershipsForRoom(
ctx context.Context, ctx context.Context,
request *api.QueryMembershipsForRoomRequest, request *api.QueryMembershipsForRoomRequest,
response *api.QueryMembershipsForRoomResponse, response *api.QueryMembershipsForRoomResponse,
@ -260,7 +265,7 @@ func (r *RoomserverInternalAPI) QueryMembershipsForRoom(
} }
// QueryServerAllowedToSeeEvent implements api.RoomserverInternalAPI // QueryServerAllowedToSeeEvent implements api.RoomserverInternalAPI
func (r *RoomserverInternalAPI) QueryServerAllowedToSeeEvent( func (r *Queryer) QueryServerAllowedToSeeEvent(
ctx context.Context, ctx context.Context,
request *api.QueryServerAllowedToSeeEventRequest, request *api.QueryServerAllowedToSeeEventRequest,
response *api.QueryServerAllowedToSeeEventResponse, response *api.QueryServerAllowedToSeeEventResponse,
@ -293,7 +298,7 @@ func (r *RoomserverInternalAPI) QueryServerAllowedToSeeEvent(
// QueryMissingEvents implements api.RoomserverInternalAPI // QueryMissingEvents implements api.RoomserverInternalAPI
// nolint:gocyclo // nolint:gocyclo
func (r *RoomserverInternalAPI) QueryMissingEvents( func (r *Queryer) QueryMissingEvents(
ctx context.Context, ctx context.Context,
request *api.QueryMissingEventsRequest, request *api.QueryMissingEventsRequest,
response *api.QueryMissingEventsResponse, response *api.QueryMissingEventsResponse,
@ -352,7 +357,7 @@ func (r *RoomserverInternalAPI) QueryMissingEvents(
} }
// QueryStateAndAuthChain implements api.RoomserverInternalAPI // QueryStateAndAuthChain implements api.RoomserverInternalAPI
func (r *RoomserverInternalAPI) QueryStateAndAuthChain( func (r *Queryer) QueryStateAndAuthChain(
ctx context.Context, ctx context.Context,
request *api.QueryStateAndAuthChainRequest, request *api.QueryStateAndAuthChainRequest,
response *api.QueryStateAndAuthChainResponse, response *api.QueryStateAndAuthChainResponse,
@ -405,7 +410,7 @@ func (r *RoomserverInternalAPI) QueryStateAndAuthChain(
return err return err
} }
func (r *RoomserverInternalAPI) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomInfo, eventIDs []string) ([]gomatrixserverlib.Event, error) { func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomInfo, eventIDs []string) ([]gomatrixserverlib.Event, error) {
roomState := state.NewStateResolution(r.DB, roomInfo) roomState := state.NewStateResolution(r.DB, roomInfo)
prevStates, err := r.DB.StateAtEventIDs(ctx, eventIDs) prevStates, err := r.DB.StateAtEventIDs(ctx, eventIDs)
if err != nil { if err != nil {
@ -482,7 +487,7 @@ func getAuthChain(
} }
// QueryRoomVersionCapabilities implements api.RoomserverInternalAPI // QueryRoomVersionCapabilities implements api.RoomserverInternalAPI
func (r *RoomserverInternalAPI) QueryRoomVersionCapabilities( func (r *Queryer) QueryRoomVersionCapabilities(
ctx context.Context, ctx context.Context,
request *api.QueryRoomVersionCapabilitiesRequest, request *api.QueryRoomVersionCapabilitiesRequest,
response *api.QueryRoomVersionCapabilitiesResponse, response *api.QueryRoomVersionCapabilitiesResponse,
@ -500,7 +505,7 @@ func (r *RoomserverInternalAPI) QueryRoomVersionCapabilities(
} }
// QueryRoomVersionCapabilities implements api.RoomserverInternalAPI // QueryRoomVersionCapabilities implements api.RoomserverInternalAPI
func (r *RoomserverInternalAPI) QueryRoomVersionForRoom( func (r *Queryer) QueryRoomVersionForRoom(
ctx context.Context, ctx context.Context,
request *api.QueryRoomVersionForRoomRequest, request *api.QueryRoomVersionForRoomRequest,
response *api.QueryRoomVersionForRoomResponse, response *api.QueryRoomVersionForRoomResponse,
@ -522,7 +527,7 @@ func (r *RoomserverInternalAPI) QueryRoomVersionForRoom(
return nil return nil
} }
func (r *RoomserverInternalAPI) roomVersion(roomID string) (gomatrixserverlib.RoomVersion, error) { func (r *Queryer) roomVersion(roomID string) (gomatrixserverlib.RoomVersion, error) {
var res api.QueryRoomVersionForRoomResponse var res api.QueryRoomVersionForRoomResponse
err := r.QueryRoomVersionForRoom(context.Background(), &api.QueryRoomVersionForRoomRequest{ err := r.QueryRoomVersionForRoom(context.Background(), &api.QueryRoomVersionForRoomRequest{
RoomID: roomID, RoomID: roomID,
@ -530,7 +535,7 @@ func (r *RoomserverInternalAPI) roomVersion(roomID string) (gomatrixserverlib.Ro
return res.RoomVersion, err return res.RoomVersion, err
} }
func (r *RoomserverInternalAPI) QueryPublishedRooms( func (r *Queryer) QueryPublishedRooms(
ctx context.Context, ctx context.Context,
req *api.QueryPublishedRoomsRequest, req *api.QueryPublishedRoomsRequest,
res *api.QueryPublishedRoomsResponse, res *api.QueryPublishedRoomsResponse,

View file

@ -1,4 +1,4 @@
// Copyright 2017 Vector Creations Ltd // Copyright 2020 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package internal package query
import ( import (
"context" "context"