diff --git a/build/docker/Dockerfile.component b/build/docker/Dockerfile.component index 13634391a..1acf510fb 100644 --- a/build/docker/Dockerfile.component +++ b/build/docker/Dockerfile.component @@ -6,6 +6,7 @@ ARG component=monolith ENV entrypoint=${component} COPY --from=base /build/bin/${component} /usr/bin +COPY --from=base /build/bin/goose /usr/bin VOLUME /etc/dendrite WORKDIR /etc/dendrite diff --git a/build/docker/docker-compose.monolith.yml b/build/docker/docker-compose.monolith.yml index 336a43984..7d63e1c65 100644 --- a/build/docker/docker-compose.monolith.yml +++ b/build/docker/docker-compose.monolith.yml @@ -2,7 +2,7 @@ version: "3.4" services: monolith: hostname: monolith - image: matrixdotorg/dendrite:monolith + image: matrixdotorg/dendrite-monolith:latest command: [ "--config=dendrite.yaml", "--tls-cert=server.crt", diff --git a/build/docker/docker-compose.polylith.yml b/build/docker/docker-compose.polylith.yml index 8a4c50e06..e8da9c24a 100644 --- a/build/docker/docker-compose.polylith.yml +++ b/build/docker/docker-compose.polylith.yml @@ -1,28 +1,8 @@ version: "3.4" services: - client_api_proxy: - hostname: client_api_proxy - image: matrixdotorg/dendrite:clientproxy - command: [ - "--bind-address=:8008", - "--client-api-server-url=http://client_api:8071", - "--sync-api-server-url=http://sync_api:8073", - "--media-api-server-url=http://media_api:8074" - ] - volumes: - - ./config:/etc/dendrite - networks: - - internal - depends_on: - - sync_api - - client_api - - media_api - ports: - - "8008:8008" - client_api: hostname: client_api - image: matrixdotorg/dendrite:clientapi + image: matrixdotorg/dendrite-clientapi:latest command: [ "--config=dendrite.yaml" ] @@ -34,7 +14,7 @@ services: media_api: hostname: media_api - image: matrixdotorg/dendrite:mediaapi + image: matrixdotorg/dendrite-mediaapi:latest command: [ "--config=dendrite.yaml" ] @@ -45,7 +25,7 @@ services: sync_api: hostname: sync_api - image: matrixdotorg/dendrite:syncapi + image: matrixdotorg/dendrite-syncapi:latest command: [ "--config=dendrite.yaml" ] @@ -56,7 +36,7 @@ services: room_server: hostname: room_server - image: matrixdotorg/dendrite:roomserver + image: matrixdotorg/dendrite-roomserver:latest command: [ "--config=dendrite.yaml" ] @@ -67,7 +47,7 @@ services: edu_server: hostname: edu_server - image: matrixdotorg/dendrite:eduserver + image: matrixdotorg/dendrite-eduserver:latest command: [ "--config=dendrite.yaml" ] @@ -76,28 +56,9 @@ services: networks: - internal - federation_api_proxy: - hostname: federation_api_proxy - image: matrixdotorg/dendrite:federationproxy - command: [ - "--bind-address=:8448", - "--federation-api-url=http://federation_api:8072", - "--media-api-server-url=http://media_api:8074" - ] - volumes: - - ./config:/etc/dendrite - depends_on: - - federation_api - - federation_sender - - media_api - networks: - - internal - ports: - - "8448:8448" - federation_api: hostname: federation_api - image: matrixdotorg/dendrite:federationapi + image: matrixdotorg/dendrite-federationapi:latest command: [ "--config=dendrite.yaml" ] @@ -108,7 +69,7 @@ services: federation_sender: hostname: federation_sender - image: matrixdotorg/dendrite:federationsender + image: matrixdotorg/dendrite-federationsender:latest command: [ "--config=dendrite.yaml" ] @@ -119,7 +80,7 @@ services: key_server: hostname: key_server - image: matrixdotorg/dendrite:keyserver + image: matrixdotorg/dendrite-keyserver:latest command: [ "--config=dendrite.yaml" ] @@ -130,7 +91,7 @@ services: signing_key_server: hostname: signing_key_server - image: matrixdotorg/dendrite:signingkeyserver + image: matrixdotorg/dendrite-signingkeyserver:latest command: [ "--config=dendrite.yaml" ] @@ -141,7 +102,7 @@ services: user_api: hostname: user_api - image: matrixdotorg/dendrite:userapi + image: matrixdotorg/dendrite-userapi:latest command: [ "--config=dendrite.yaml" ] @@ -152,7 +113,7 @@ services: appservice_api: hostname: appservice_api - image: matrixdotorg/dendrite:appservice + image: matrixdotorg/dendrite-appservice:latest command: [ "--config=dendrite.yaml" ] diff --git a/build/docker/images-build.sh b/build/docker/images-build.sh index d72bac214..daad63be0 100755 --- a/build/docker/images-build.sh +++ b/build/docker/images-build.sh @@ -2,20 +2,22 @@ cd $(git rev-parse --show-toplevel) -docker build -f build/docker/Dockerfile -t matrixdotorg/dendrite:latest . +TAG=${1:-latest} -docker build -t matrixdotorg/dendrite:monolith --build-arg component=dendrite-monolith-server -f build/docker/Dockerfile.component . +echo "Building tag '${TAG}'" -docker build -t matrixdotorg/dendrite:appservice --build-arg component=dendrite-appservice-server -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:clientapi --build-arg component=dendrite-client-api-server -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:clientproxy --build-arg component=client-api-proxy -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:eduserver --build-arg component=dendrite-edu-server -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:federationapi --build-arg component=dendrite-federation-api-server -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:federationsender --build-arg component=dendrite-federation-sender-server -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:federationproxy --build-arg component=federation-api-proxy -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:keyserver --build-arg component=dendrite-key-server -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:mediaapi --build-arg component=dendrite-media-api-server -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:roomserver --build-arg component=dendrite-room-server -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:syncapi --build-arg component=dendrite-sync-api-server -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:signingkeyserver --build-arg component=dendrite-signing-key-server -f build/docker/Dockerfile.component . -docker build -t matrixdotorg/dendrite:userapi --build-arg component=dendrite-user-api-server -f build/docker/Dockerfile.component . +docker build -f build/docker/Dockerfile -t matrixdotorg/dendrite:${TAG} . + +docker build -t matrixdotorg/dendrite-monolith:${TAG} --build-arg component=dendrite-monolith-server -f build/docker/Dockerfile.component . + +docker build -t matrixdotorg/dendrite-appservice:${TAG} --build-arg component=dendrite-appservice-server -f build/docker/Dockerfile.component . +docker build -t matrixdotorg/dendrite-clientapi:${TAG} --build-arg component=dendrite-client-api-server -f build/docker/Dockerfile.component . +docker build -t matrixdotorg/dendrite-eduserver:${TAG} --build-arg component=dendrite-edu-server -f build/docker/Dockerfile.component . +docker build -t matrixdotorg/dendrite-federationapi:${TAG} --build-arg component=dendrite-federation-api-server -f build/docker/Dockerfile.component . +docker build -t matrixdotorg/dendrite-federationsender:${TAG} --build-arg component=dendrite-federation-sender-server -f build/docker/Dockerfile.component . +docker build -t matrixdotorg/dendrite-keyserver:${TAG} --build-arg component=dendrite-key-server -f build/docker/Dockerfile.component . +docker build -t matrixdotorg/dendrite-mediaapi:${TAG} --build-arg component=dendrite-media-api-server -f build/docker/Dockerfile.component . +docker build -t matrixdotorg/dendrite-roomserver:${TAG} --build-arg component=dendrite-room-server -f build/docker/Dockerfile.component . +docker build -t matrixdotorg/dendrite-syncapi:${TAG} --build-arg component=dendrite-sync-api-server -f build/docker/Dockerfile.component . +docker build -t matrixdotorg/dendrite-signingkeyserver:${TAG} --build-arg component=dendrite-signing-key-server -f build/docker/Dockerfile.component . +docker build -t matrixdotorg/dendrite-userapi:${TAG} --build-arg component=dendrite-user-api-server -f build/docker/Dockerfile.component . diff --git a/build/docker/images-pull.sh b/build/docker/images-pull.sh index be9185464..e3284a2a6 100755 --- a/build/docker/images-pull.sh +++ b/build/docker/images-pull.sh @@ -1,17 +1,19 @@ #!/bin/bash -docker pull matrixdotorg/dendrite:monolith +TAG=${1:-latest} -docker pull matrixdotorg/dendrite:appservice -docker pull matrixdotorg/dendrite:clientapi -docker pull matrixdotorg/dendrite:clientproxy -docker pull matrixdotorg/dendrite:eduserver -docker pull matrixdotorg/dendrite:federationapi -docker pull matrixdotorg/dendrite:federationsender -docker pull matrixdotorg/dendrite:federationproxy -docker pull matrixdotorg/dendrite:keyserver -docker pull matrixdotorg/dendrite:mediaapi -docker pull matrixdotorg/dendrite:roomserver -docker pull matrixdotorg/dendrite:syncapi -docker pull matrixdotorg/dendrite:signingkeyserver -docker pull matrixdotorg/dendrite:userapi +echo "Pulling tag '${TAG}'" + +docker pull matrixdotorg/dendrite-monolith:${TAG} + +docker pull matrixdotorg/dendrite-appservice:${TAG} +docker pull matrixdotorg/dendrite-clientapi:${TAG} +docker pull matrixdotorg/dendrite-eduserver:${TAG} +docker pull matrixdotorg/dendrite-federationapi:${TAG} +docker pull matrixdotorg/dendrite-federationsender:${TAG} +docker pull matrixdotorg/dendrite-keyserver:${TAG} +docker pull matrixdotorg/dendrite-mediaapi:${TAG} +docker pull matrixdotorg/dendrite-roomserver:${TAG} +docker pull matrixdotorg/dendrite-syncapi:${TAG} +docker pull matrixdotorg/dendrite-signingkeyserver:${TAG} +docker pull matrixdotorg/dendrite-userapi:${TAG} diff --git a/build/docker/images-push.sh b/build/docker/images-push.sh index 64920171b..e4eb773ac 100755 --- a/build/docker/images-push.sh +++ b/build/docker/images-push.sh @@ -1,17 +1,19 @@ #!/bin/bash -docker push matrixdotorg/dendrite:monolith +TAG=${1:-latest} -docker push matrixdotorg/dendrite:appservice -docker push matrixdotorg/dendrite:clientapi -docker push matrixdotorg/dendrite:clientproxy -docker push matrixdotorg/dendrite:eduserver -docker push matrixdotorg/dendrite:federationapi -docker push matrixdotorg/dendrite:federationsender -docker push matrixdotorg/dendrite:federationproxy -docker push matrixdotorg/dendrite:keyserver -docker push matrixdotorg/dendrite:mediaapi -docker push matrixdotorg/dendrite:roomserver -docker push matrixdotorg/dendrite:syncapi -docker push matrixdotorg/dendrite:signingkeyserver -docker push matrixdotorg/dendrite:userapi +echo "Pushing tag '${TAG}'" + +docker push matrixdotorg/dendrite-monolith:${TAG} + +docker push matrixdotorg/dendrite-appservice:${TAG} +docker push matrixdotorg/dendrite-clientapi:${TAG} +docker push matrixdotorg/dendrite-eduserver:${TAG} +docker push matrixdotorg/dendrite-federationapi:${TAG} +docker push matrixdotorg/dendrite-federationsender:${TAG} +docker push matrixdotorg/dendrite-keyserver:${TAG} +docker push matrixdotorg/dendrite-mediaapi:${TAG} +docker push matrixdotorg/dendrite-roomserver:${TAG} +docker push matrixdotorg/dendrite-syncapi:${TAG} +docker push matrixdotorg/dendrite-signingkeyserver:${TAG} +docker push matrixdotorg/dendrite-userapi:${TAG} diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 9655339cd..cff3c9813 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -344,6 +344,7 @@ func createRoom( if err = roomserverAPI.SendEventWithState( req.Context(), rsAPI, + roomserverAPI.KindNew, &gomatrixserverlib.RespState{ StateEvents: accumulated, AuthEvents: accumulated, diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index 88cb23647..fe0795577 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -76,6 +76,7 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us if err = roomserverAPI.SendEvents( ctx, rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)}, cfg.Matrix.ServerName, nil, diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index 60669a0c8..bbe35facd 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -170,7 +170,7 @@ func SetAvatarURL( return jsonerror.InternalServerError() } - if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -288,7 +288,7 @@ func SetDisplayName( return jsonerror.InternalServerError() } - if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/redaction.go b/clientapi/routing/redaction.go index 9701685e0..266c0aff2 100644 --- a/clientapi/routing/redaction.go +++ b/clientapi/routing/redaction.go @@ -121,7 +121,7 @@ func SendRedaction( JSON: jsonerror.NotFound("Room does not exist"), } } - if err = roomserverAPI.SendEvents(context.Background(), rsAPI, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil); err != nil { + if err = roomserverAPI.SendEvents(context.Background(), rsAPI, api.KindNew, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil); err != nil { util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index 9744a5640..1303663ff 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -92,6 +92,7 @@ func SendEvent( // event ID in case of duplicate transaction is discarded if err := api.SendEvents( req.Context(), rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ e.Headered(verRes.RoomVersion), }, diff --git a/clientapi/routing/state.go b/clientapi/routing/state.go index 2a424cbe8..f69b54bbc 100644 --- a/clientapi/routing/state.go +++ b/clientapi/routing/state.go @@ -141,7 +141,7 @@ func OnIncomingStateRequest(ctx context.Context, device *userapi.Device, rsAPI a util.GetLogger(ctx).WithError(err).Error("Failed to QueryMembershipForUser") return jsonerror.InternalServerError() } - for _, ev := range stateRes.StateEvents { + for _, ev := range stateAfterRes.StateEvents { stateEvents = append( stateEvents, gomatrixserverlib.HeaderedToClientEvent(ev, gomatrixserverlib.FormatAll), diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go index b9575a284..272d3407d 100644 --- a/clientapi/threepid/invites.go +++ b/clientapi/threepid/invites.go @@ -361,6 +361,7 @@ func emit3PIDInviteEvent( return api.SendEvents( ctx, rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ (*event).Headered(queryRes.RoomVersion), }, diff --git a/docs/INSTALL.md b/docs/INSTALL.md index 42c3425ba..a10b0daad 100644 --- a/docs/INSTALL.md +++ b/docs/INSTALL.md @@ -96,7 +96,7 @@ brew services start kafka ### SQLite database setup Dendrite can use the built-in SQLite database engine for small setups. -The SQLite databases do not need to be preconfigured - Dendrite will +The SQLite databases do not need to be pre-built - Dendrite will create them automatically at startup. ### Postgres database setup @@ -112,7 +112,7 @@ Assuming that Postgres 9.5 (or later) is installed: * Create the component databases: ```bash - for i in userapi_accounts userapi_devices mediaapi syncapi roomserver signingkeyserver federationsender keyserver appservice e2ekey naffka ; do + for i in mediaapi syncapi roomserver signingkeyserver federationsender appservice keyserver userapi_account userapi_device naffka; do sudo -u postgres createdb -O dendrite dendrite_$i done ``` @@ -147,8 +147,8 @@ Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Th * The `server_name` entry to reflect the hostname of your Dendrite server * The `database` lines with an updated connection string based on your - desired setup, e.g. replacing `component` with the name of the component: - * For Postgres: `postgres://dendrite:password@localhost/component` + desired setup, e.g. replacing `database` with the name of the database: + * For Postgres: `postgres://dendrite:password@localhost/database` * For SQLite on disk: `file:component.db` or `file:///path/to/component.db` * Postgres and SQLite can be mixed and matched. * The `use_naffka` option if using Naffka in a monolith deployment @@ -159,6 +159,10 @@ then configuring `key_perspectives` (like `matrix.org` in the sample) can help to improve reliability considerably by allowing your homeserver to fetch public keys for dead homeservers from somewhere else. +**WARNING:** Dendrite supports running all components from the same database in +Postgres mode, but this is **NOT** a supported configuration with SQLite. When +using SQLite, all components **MUST** use their own database file. + ## Starting a monolith server It is possible to use Naffka as an in-process replacement to Kafka when using @@ -179,30 +183,17 @@ as shown below, it will also listen for HTTPS connections on port 8448. The following contains scripts which will run all the required processes in order to point a Matrix client at Dendrite. -### Client proxy +### nginx (or other reverse proxy) -This is what Matrix clients will talk to. If you use the script below, point -your client at `http://localhost:8008`. +This is what your clients and federated hosts will talk to. It must forward +requests onto the correct API server based on URL: -```bash -./bin/client-api-proxy \ ---bind-address ":8008" \ ---client-api-server-url "http://localhost:7771" \ ---sync-api-server-url "http://localhost:7773" \ ---media-api-server-url "http://localhost:7774" \ -``` +* `/_matrix/client` to the client API server +* `/_matrix/federation` to the federation API server +* `/_matrix/key` to the federation API server +* `/_matrix/media` to the media API server -### Federation proxy - -This is what Matrix servers will talk to. This is only required if you want -to support federation. - -```bash -./bin/federation-api-proxy \ ---bind-address ":8448" \ ---federation-api-url "http://localhost:7772" \ ---media-api-server-url "http://localhost:7774" \ -``` +See `docs/nginx/polylith-sample.conf` for a sample configuration. ### Client API server @@ -210,7 +201,7 @@ This is what implements CS API endpoints. Clients talk to this via the proxy in order to send messages, create and join rooms, etc. ```bash -./bin/dendrite-client-api-server --config=dendrite.yaml +./bin/dendrite-client-api-server --config dendrite.yaml ``` ### Sync server @@ -251,7 +242,7 @@ contacted by other components. This includes the following components. This is what implements the room DAG. Clients do not talk to this. ```bash -./bin/dendrite-room-server --config=dendrite.yaml +./bin/dendrite-room-server --config dendrite.yaml ``` #### Federation sender diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index c637116f7..12f205366 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -290,6 +290,7 @@ func SendJoin( if !alreadyJoined { if err = api.SendEvents( httpReq.Context(), rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ event.Headered(stateAndAuthChainResponse.RoomVersion), }, diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go index e16dfcc2e..fb81d9319 100644 --- a/federationapi/routing/leave.go +++ b/federationapi/routing/leave.go @@ -256,6 +256,7 @@ func SendLeave( // the room, so set SendAsServer to cfg.Matrix.ServerName if err = api.SendEvents( httpReq.Context(), rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ event.Headered(verRes.RoomVersion), }, diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 783fdc3b8..76dc3a2ee 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -403,6 +403,7 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) er return api.SendEvents( context.Background(), t.rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ e.Headered(stateResp.RoomVersion), }, @@ -586,6 +587,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser err = api.SendEventWithState( context.Background(), t.rsAPI, + api.KindOld, resolvedState, backwardsExtremity.Headered(roomVersion), t.haveEventIDs(), @@ -605,6 +607,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser if err = api.SendEvents( context.Background(), t.rsAPI, + api.KindOld, append(headeredNewEvents, e.Headered(roomVersion)), api.DoNotSendToOtherServers, nil, diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go index ec6cc1488..4db5273af 100644 --- a/federationapi/routing/threepid.go +++ b/federationapi/routing/threepid.go @@ -89,7 +89,7 @@ func CreateInvitesFrom3PIDInvites( } // Send all the events - if err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -174,6 +174,7 @@ func ExchangeThirdPartyInvite( // Send the event to the roomserver if err = api.SendEvents( httpReq.Context(), rsAPI, + api.KindNew, []gomatrixserverlib.HeaderedEvent{ signedEvent.Event.Headered(verRes.RoomVersion), }, diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 254883e63..3904ab856 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -248,6 +248,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer( // returned state to the roomserver to update our local view. if err = roomserverAPI.SendEventWithState( ctx, r.rsAPI, + roomserverAPI.KindNew, respState, event.Headered(respMakeJoin.RoomVersion), nil, diff --git a/roomserver/api/input.go b/roomserver/api/input.go index dd693203b..e1a8afa00 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -21,17 +21,25 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +type Kind int + const ( // KindOutlier event fall outside the contiguous event graph. // We do not have the state for these events. // These events are state events used to authenticate other events. // They can become part of the contiguous event graph via backfill. - KindOutlier = 1 + KindOutlier Kind = iota + 1 // KindNew event extend the contiguous graph going forwards. // They usually don't need state, but may include state if the // there was a new event that references an event that we don't - // have a copy of. - KindNew = 2 + // have a copy of. New events will influence the fwd extremities + // of the room and output events will be generated as a result. + KindNew + // KindOld event extend the graph backwards, or fill gaps in + // history. They may or may not include state. They will not be + // considered for forward extremities, and output events will NOT + // be generated for them. + KindOld ) // DoNotSendToOtherServers tells us not to send the event to other matrix @@ -43,7 +51,7 @@ const DoNotSendToOtherServers = "" type InputRoomEvent struct { // Whether this event is new, backfilled or an outlier. // This controls how the event is processed. - Kind int `json:"kind"` + Kind Kind `json:"kind"` // The event JSON for the event to add. Event gomatrixserverlib.HeaderedEvent `json:"event"` // List of state event IDs that authenticate this event. diff --git a/roomserver/api/output.go b/roomserver/api/output.go index d57f3b04c..9cb814a47 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -24,6 +24,8 @@ type OutputType string const ( // OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent OutputTypeNewRoomEvent OutputType = "new_room_event" + // OutputTypeOldRoomEvent indicates that the event is an OutputOldRoomEvent + OutputTypeOldRoomEvent OutputType = "old_room_event" // OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent OutputTypeNewInviteEvent OutputType = "new_invite_event" // OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent @@ -58,6 +60,8 @@ type OutputEvent struct { Type OutputType `json:"type"` // The content of event with type OutputTypeNewRoomEvent NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"` + // The content of event with type OutputTypeOldRoomEvent + OldRoomEvent *OutputOldRoomEvent `json:"old_room_event,omitempty"` // The content of event with type OutputTypeNewInviteEvent NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"` // The content of event with type OutputTypeRetireInviteEvent @@ -178,6 +182,20 @@ func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent { return append(ore.AddStateEvents, ore.Event) } +// An OutputOldRoomEvent is written when the roomserver receives an old event. +// This will typically happen as a result of getting either missing events +// or backfilling. Downstream components may wish to send these events to +// clients when it is advantageous to do so, but with the consideration that +// the event is likely a historic event. +// +// Old events do not update forward extremities or the current room state, +// therefore they must not be treated as if they do. Downstream components +// should build their current room state up from OutputNewRoomEvents only. +type OutputOldRoomEvent struct { + // The Event. + Event gomatrixserverlib.HeaderedEvent `json:"event"` +} + // An OutputNewInviteEvent is written whenever an invite becomes active. // Invite events can be received outside of an existing room so have to be // tracked separately from the room events themselves. diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index a38c00df7..9e8219103 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -24,13 +24,14 @@ import ( // SendEvents to the roomserver The events are written with KindNew. func SendEvents( - ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent, + ctx context.Context, rsAPI RoomserverInternalAPI, + kind Kind, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, ) error { ires := make([]InputRoomEvent, len(events)) for i, event := range events { ires[i] = InputRoomEvent{ - Kind: KindNew, + Kind: kind, Event: event, AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), @@ -40,12 +41,13 @@ func SendEvents( return SendInputRoomEvents(ctx, rsAPI, ires) } -// SendEventWithState writes an event with KindNew to the roomserver +// SendEventWithState writes an event with the specified kind to the roomserver // with the state at the event as KindOutlier before it. Will not send any event that is // marked as `true` in haveEventIDs func SendEventWithState( - ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState, - event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, + ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, + state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, + haveEventIDs map[string]bool, ) error { outliers, err := state.Events() if err != nil { @@ -70,7 +72,7 @@ func SendEventWithState( } ires = append(ires, InputRoomEvent{ - Kind: KindNew, + Kind: kind, Event: event, AuthEventIDs: event.AuthEventIDs(), HasState: true, diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 113341591..6a5d9d264 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -17,6 +17,7 @@ package input import ( + "bytes" "context" "fmt" @@ -26,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" "github.com/sirupsen/logrus" ) @@ -44,6 +46,28 @@ func (r *Inputer) processRoomEvent( headered := input.Event event := headered.Unwrap() + // if we have already got this event then do not process it again, if the input kind is an outlier. + // Outliers contain no extra information which may warrant a re-processing. + if input.Kind == api.KindOutlier { + evs, err := r.DB.EventsFromIDs(ctx, []string{event.EventID()}) + if err == nil && len(evs) == 1 { + // check hash matches if we're on early room versions where the event ID was a random string + idFormat, err := headered.RoomVersion.EventIDFormat() + if err == nil { + switch idFormat { + case gomatrixserverlib.EventIDFormatV1: + if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { + util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") + return event.EventID(), nil + } + default: + util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") + return event.EventID(), nil + } + } + } + } + // Check that the event passes authentication checks and work out // the numeric IDs for the auth events. isRejected := false @@ -119,7 +143,7 @@ func (r *Inputer) processRoomEvent( // We haven't calculated a state for this event yet. // Lets calculate one. err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected) - if err != nil { + if err != nil && input.Kind != api.KindOld { return "", fmt.Errorf("r.calculateAndSetState: %w", err) } } @@ -136,16 +160,31 @@ func (r *Inputer) processRoomEvent( return event.EventID(), rejectionErr } - if err = r.updateLatestEvents( - ctx, // context - roomInfo, // room info for the room being updated - stateAtEvent, // state at event (below) - event, // event - input.SendAsServer, // send as server - input.TransactionID, // transaction ID - input.HasState, // rewrites state? - ); err != nil { - return "", fmt.Errorf("r.updateLatestEvents: %w", err) + switch input.Kind { + case api.KindNew: + if err = r.updateLatestEvents( + ctx, // context + roomInfo, // room info for the room being updated + stateAtEvent, // state at event (below) + event, // event + input.SendAsServer, // send as server + input.TransactionID, // transaction ID + input.HasState, // rewrites state? + ); err != nil { + return "", fmt.Errorf("r.updateLatestEvents: %w", err) + } + case api.KindOld: + err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ + { + Type: api.OutputTypeOldRoomEvent, + OldRoomEvent: &api.OutputOldRoomEvent{ + Event: headered, + }, + }, + }) + if err != nil { + return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err) + } } // processing this event resulted in an event (which may not be the one we're processing) @@ -163,7 +202,7 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return "", fmt.Errorf("r.WriteOutputEvents: %w", err) + return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) } } diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index ca5d214d7..f76b0a0b4 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -164,8 +164,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { return fmt.Errorf("u.api.updateMemberships: %w", err) } - var update *api.OutputEvent - update, err = u.makeOutputNewRoomEvent() + update, err := u.makeOutputNewRoomEvent() if err != nil { return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) } @@ -234,7 +233,7 @@ func (u *latestEventsUpdater) latestState() error { if err != nil { return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err) } - if len(u.removed) > len(u.added) { + if !u.stateAtEvent.Overwrite && len(u.removed) > len(u.added) { // This really shouldn't happen. // TODO: What is ultimately the best way to handle this situation? logrus.Errorf( @@ -259,6 +258,8 @@ func (u *latestEventsUpdater) latestState() error { return nil } +// calculateLatest works out the new set of forward extremities. Returns +// true if the new event is included in those extremites, false otherwise. func (u *latestEventsUpdater) calculateLatest( oldLatest []types.StateAtEventAndReference, newEvent types.StateAtEventAndReference, @@ -326,7 +327,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) if err != nil { return nil, err } - for _, entry := range u.added { ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) } @@ -339,13 +339,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) for _, entry := range u.stateBeforeEventAdds { ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID]) } + ore.SendAsServer = u.sendAsServer // include extra state events if they were added as nearly every downstream component will care about it // and we'd rather not have them all hit QueryEventsByID at the same time! if len(ore.AddsStateEventIDs) > 0 { - ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs) - if err != nil { + var err error + if ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs); err != nil { return nil, fmt.Errorf("failed to load add_state_events from db: %w", err) } } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 1b692a098..c8e60efa3 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -191,7 +191,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js t.Helper() rsAPI, dp := mustCreateRoomserverAPI(t) hevents := mustLoadRawEvents(t, ver, events) - if err := api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil); err != nil { + if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil); err != nil { t.Errorf("failed to SendEvents: %s", err) } return rsAPI, dp, hevents @@ -337,7 +337,7 @@ func TestOutputRewritesState(t *testing.T) { deleteDatabase() rsAPI, producer := mustCreateRoomserverAPI(t) defer deleteDatabase() - err := api.SendEvents(context.Background(), rsAPI, originalEvents, testOrigin, nil) + err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil) if err != nil { t.Fatalf("failed to send original events: %s", err) } diff --git a/roomserver/storage/shared/latest_events_updater.go b/roomserver/storage/shared/latest_events_updater.go index b316f639d..8825dc464 100644 --- a/roomserver/storage/shared/latest_events_updater.go +++ b/roomserver/storage/shared/latest_events_updater.go @@ -70,16 +70,14 @@ func (u *LatestEventsUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID { return u.currentStateSnapshotNID } -// StorePreviousEvents implements types.RoomRecentEventsUpdater +// StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer func (u *LatestEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error { - return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error { - for _, ref := range previousEventReferences { - if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, txn, ref.EventID, ref.EventSHA256, eventNID); err != nil { - return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err) - } + for _, ref := range previousEventReferences { + if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil { + return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err) } - return nil - }) + } + return nil } // IsReferenced implements types.RoomRecentEventsUpdater diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index f2be8b3cf..51dcb8887 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -492,15 +492,32 @@ func (d *Database) StoreEvent( if roomInfo == nil && len(prevEvents) > 0 { return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID()) } + // Create an updater - NB: on sqlite this WILL create a txn as we are directly calling the shared DB form of + // GetLatestEventsForUpdate - not via the SQLiteDatabase form which has `nil` txns. This + // function only does SELECTs though so the created txn (at this point) is just a read txn like + // any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater + // to do writes however then this will need to go inside `Writer.Do`. updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo) if err != nil { return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err) } - if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil { - return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err) + // Ensure that we atomically store prev events AND commit them. If we don't wrap StorePreviousEvents + // and EndTransaction in a writer then it's possible for a new write txn to be made between the two + // function calls which will then fail with 'database is locked'. This new write txn would HAVE to be + // something like SetRoomAlias/RemoveRoomAlias as normal input events are already done sequentially due to + // SupportsConcurrentRoomInputs() == false on sqlite, though this does not apply to setting room aliases + // as they don't go via InputRoomEvents + err = d.Writer.Do(d.DB, updater.txn, func(txn *sql.Tx) error { + if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil { + return fmt.Errorf("updater.StorePreviousEvents: %w", err) + } + succeeded := true + err = sqlutil.EndTransaction(updater, &succeeded) + return err + }) + if err != nil { + return 0, types.StateAtEvent{}, nil, "", err } - succeeded := true - err = sqlutil.EndTransaction(updater, &succeeded) } return roomNID, types.StateAtEvent{ diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index ca48c8300..373baea54 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -97,6 +97,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } } return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + case api.OutputTypeOldRoomEvent: + return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) case api.OutputTypeNewInviteEvent: return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: @@ -168,7 +170,40 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( log.ErrorKey: err, "add": msg.AddsStateEventIDs, "del": msg.RemovesStateEventIDs, - }).Panicf("roomserver output log: write event failure") + }).Panicf("roomserver output log: write new event failure") + return nil + } + + if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil { + logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) + return err + } + + s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) + + return nil +} + +func (s *OutputRoomEventConsumer) onOldRoomEvent( + ctx context.Context, msg api.OutputOldRoomEvent, +) error { + ev := msg.Event + + pduPos, err := s.db.WriteEvent( + ctx, + &ev, + []gomatrixserverlib.HeaderedEvent{}, + []string{}, // adds no state + []string{}, // removes no state + nil, // no transaction + false, // not excluded from sync + ) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write old event failure") return nil } diff --git a/sytest-blacklist b/sytest-blacklist index ff7fdf7e0..f493f94fe 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -57,4 +57,7 @@ The only membership state included in a gapped incremental sync is for senders i # Blacklisted out of flakiness after #1479 Invited user can reject local invite after originator leaves Invited user can reject invite for empty room -If user leaves room, remote user changes device and rejoins we see update in /sync and /keys/changes \ No newline at end of file +If user leaves room, remote user changes device and rejoins we see update in /sync and /keys/changes + +# Blacklisted due to flakiness +A prev_batch token from incremental sync can be used in the v1 messages API \ No newline at end of file diff --git a/sytest-whitelist b/sytest-whitelist index 2ba0a88b2..cecf24f75 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -481,6 +481,7 @@ m.room.history_visibility == "joined" allows/forbids appropriately for Guest use m.room.history_visibility == "joined" allows/forbids appropriately for Real users POST rejects invalid utf-8 in JSON Users cannot kick users who have already left a room -A prev_batch token from incremental sync can be used in the v1 messages API Event with an invalid signature in the send_join response should not cause room join to fail Inbound federation rejects typing notifications from wrong remote +Should not be able to take over the room by pretending there is no PL event +Can get rooms/{roomId}/state for a departed room (SPEC-216)