diff --git a/keyserver/consumers/roomevent.go b/keyserver/consumers/roomevent.go deleted file mode 100644 index 78fe68783..000000000 --- a/keyserver/consumers/roomevent.go +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - log "github.com/sirupsen/logrus" - "github.com/tidwall/gjson" - - fedapi "github.com/matrix-org/dendrite/federationapi/api" - "github.com/matrix-org/dendrite/keyserver/producers" - "github.com/matrix-org/dendrite/keyserver/storage" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" -) - -// OutputRoomEventConsumer consumes events that originated in the room server. -type OutputRoomEventConsumer struct { - ctx context.Context - cfg *config.KeyServer - jetstream nats.JetStreamContext - durable string - db storage.Database - topic string - producer *producers.KeyChange - fedClient fedapi.KeyserverFederationAPI -} - -// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEventConsumer( - process *process.ProcessContext, - cfg *config.KeyServer, - js nats.JetStreamContext, - store storage.Database, - keyChangeProducer *producers.KeyChange, - fedClient fedapi.KeyserverFederationAPI, - -) *OutputRoomEventConsumer { - return &OutputRoomEventConsumer{ - ctx: process.Context(), - cfg: cfg, - jetstream: js, - db: store, - durable: cfg.Matrix.JetStream.Durable("KeyserverAPIRoomServerConsumer"), - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), - producer: keyChangeProducer, - fedClient: fedClient, - } -} - -// Start consuming from room servers -func (s *OutputRoomEventConsumer) Start() error { - return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, 1, - s.onMessage, nats.DeliverAll(), nats.ManualAck(), - ) -} - -// onMessage is called when the federation server receives a new event from the room server output log. -// It is unsafe to call this with messages for the same room in multiple gorountines -// because updates it will likely fail with a types.EventIDMismatchError when it -// realises that it cannot update the room state using the deltas. -func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { - msg := msgs[0] // Guaranteed to exist if onMessage is called - // Parse out the event JSON - var output api.OutputEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return true - } - - switch output.Type { - case api.OutputTypeNewRoomEvent: - log.Debugf("XXX: got room event") - ev := output.NewRoomEvent.Event - // Only handle m.room.member events - if ev.Type() != gomatrixserverlib.MRoomMember || ev.StateKey() == nil { - log.Debugf("XXX: ignoring, not a membership event") - return true - } - - _, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) - if err != nil { - log.WithError(err).Errorf("unable to split statekey") - return true - } - - // ignore membership changes from other servers - if domain != s.cfg.Matrix.ServerName { - log.Debugf("XXX: ignoring event from different server") - return true - } - - curMembership, err := ev.Membership() - // we only care about join events - if err != nil { - log.WithError(err).Errorf("unable to parse membership event") - return true - } - if curMembership != gomatrixserverlib.Join { - log.Debugf("XXX: ignoring event, current membership is %s", curMembership) - return true - } - prevMembership := gjson.GetBytes(ev.Unsigned(), "prev_content.membership").Str - - switch prevMembership { - case gomatrixserverlib.Invite: - fallthrough - case gomatrixserverlib.Knock: - fallthrough - case gomatrixserverlib.Leave: - fallthrough - case "": // there's no prev membership, inform remote - devMessages, err := s.db.DeviceKeysForUser(ctx, *ev.StateKey(), []string{}, true) - if err != nil { - log.WithError(err).Errorf("unable to get device keys for user") - return false // we want to retry this - } - log.Debugf("XXX: producing key changes") - - return s.producer.ProduceKeyChanges(devMessages) == nil - default: - log.Debugf("ignoring unexpected prev membership: '%s'", prevMembership) - return true - } - default: // ignore all other events - } - return true -}