Merge remote-tracking branch 'origin/master' into dymattic-install

# Conflicts:
#	docs/INSTALL.md
This commit is contained in:
Dmitrij Pastian 2020-10-20 13:04:35 +02:00
commit aa7d1a55ae
30 changed files with 263 additions and 171 deletions

View file

@ -6,6 +6,7 @@ ARG component=monolith
ENV entrypoint=${component}
COPY --from=base /build/bin/${component} /usr/bin
COPY --from=base /build/bin/goose /usr/bin
VOLUME /etc/dendrite
WORKDIR /etc/dendrite

View file

@ -2,7 +2,7 @@ version: "3.4"
services:
monolith:
hostname: monolith
image: matrixdotorg/dendrite:monolith
image: matrixdotorg/dendrite-monolith:latest
command: [
"--config=dendrite.yaml",
"--tls-cert=server.crt",

View file

@ -1,28 +1,8 @@
version: "3.4"
services:
client_api_proxy:
hostname: client_api_proxy
image: matrixdotorg/dendrite:clientproxy
command: [
"--bind-address=:8008",
"--client-api-server-url=http://client_api:8071",
"--sync-api-server-url=http://sync_api:8073",
"--media-api-server-url=http://media_api:8074"
]
volumes:
- ./config:/etc/dendrite
networks:
- internal
depends_on:
- sync_api
- client_api
- media_api
ports:
- "8008:8008"
client_api:
hostname: client_api
image: matrixdotorg/dendrite:clientapi
image: matrixdotorg/dendrite-clientapi:latest
command: [
"--config=dendrite.yaml"
]
@ -34,7 +14,7 @@ services:
media_api:
hostname: media_api
image: matrixdotorg/dendrite:mediaapi
image: matrixdotorg/dendrite-mediaapi:latest
command: [
"--config=dendrite.yaml"
]
@ -45,7 +25,7 @@ services:
sync_api:
hostname: sync_api
image: matrixdotorg/dendrite:syncapi
image: matrixdotorg/dendrite-syncapi:latest
command: [
"--config=dendrite.yaml"
]
@ -56,7 +36,7 @@ services:
room_server:
hostname: room_server
image: matrixdotorg/dendrite:roomserver
image: matrixdotorg/dendrite-roomserver:latest
command: [
"--config=dendrite.yaml"
]
@ -67,7 +47,7 @@ services:
edu_server:
hostname: edu_server
image: matrixdotorg/dendrite:eduserver
image: matrixdotorg/dendrite-eduserver:latest
command: [
"--config=dendrite.yaml"
]
@ -76,28 +56,9 @@ services:
networks:
- internal
federation_api_proxy:
hostname: federation_api_proxy
image: matrixdotorg/dendrite:federationproxy
command: [
"--bind-address=:8448",
"--federation-api-url=http://federation_api:8072",
"--media-api-server-url=http://media_api:8074"
]
volumes:
- ./config:/etc/dendrite
depends_on:
- federation_api
- federation_sender
- media_api
networks:
- internal
ports:
- "8448:8448"
federation_api:
hostname: federation_api
image: matrixdotorg/dendrite:federationapi
image: matrixdotorg/dendrite-federationapi:latest
command: [
"--config=dendrite.yaml"
]
@ -108,7 +69,7 @@ services:
federation_sender:
hostname: federation_sender
image: matrixdotorg/dendrite:federationsender
image: matrixdotorg/dendrite-federationsender:latest
command: [
"--config=dendrite.yaml"
]
@ -119,7 +80,7 @@ services:
key_server:
hostname: key_server
image: matrixdotorg/dendrite:keyserver
image: matrixdotorg/dendrite-keyserver:latest
command: [
"--config=dendrite.yaml"
]
@ -130,7 +91,7 @@ services:
signing_key_server:
hostname: signing_key_server
image: matrixdotorg/dendrite:signingkeyserver
image: matrixdotorg/dendrite-signingkeyserver:latest
command: [
"--config=dendrite.yaml"
]
@ -141,7 +102,7 @@ services:
user_api:
hostname: user_api
image: matrixdotorg/dendrite:userapi
image: matrixdotorg/dendrite-userapi:latest
command: [
"--config=dendrite.yaml"
]
@ -152,7 +113,7 @@ services:
appservice_api:
hostname: appservice_api
image: matrixdotorg/dendrite:appservice
image: matrixdotorg/dendrite-appservice:latest
command: [
"--config=dendrite.yaml"
]

View file

@ -2,20 +2,22 @@
cd $(git rev-parse --show-toplevel)
docker build -f build/docker/Dockerfile -t matrixdotorg/dendrite:latest .
TAG=${1:-latest}
docker build -t matrixdotorg/dendrite:monolith --build-arg component=dendrite-monolith-server -f build/docker/Dockerfile.component .
echo "Building tag '${TAG}'"
docker build -t matrixdotorg/dendrite:appservice --build-arg component=dendrite-appservice-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:clientapi --build-arg component=dendrite-client-api-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:clientproxy --build-arg component=client-api-proxy -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:eduserver --build-arg component=dendrite-edu-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:federationapi --build-arg component=dendrite-federation-api-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:federationsender --build-arg component=dendrite-federation-sender-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:federationproxy --build-arg component=federation-api-proxy -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:keyserver --build-arg component=dendrite-key-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:mediaapi --build-arg component=dendrite-media-api-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:roomserver --build-arg component=dendrite-room-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:syncapi --build-arg component=dendrite-sync-api-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:signingkeyserver --build-arg component=dendrite-signing-key-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:userapi --build-arg component=dendrite-user-api-server -f build/docker/Dockerfile.component .
docker build -f build/docker/Dockerfile -t matrixdotorg/dendrite:${TAG} .
docker build -t matrixdotorg/dendrite-monolith:${TAG} --build-arg component=dendrite-monolith-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-appservice:${TAG} --build-arg component=dendrite-appservice-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-clientapi:${TAG} --build-arg component=dendrite-client-api-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-eduserver:${TAG} --build-arg component=dendrite-edu-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-federationapi:${TAG} --build-arg component=dendrite-federation-api-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-federationsender:${TAG} --build-arg component=dendrite-federation-sender-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-keyserver:${TAG} --build-arg component=dendrite-key-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-mediaapi:${TAG} --build-arg component=dendrite-media-api-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-roomserver:${TAG} --build-arg component=dendrite-room-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-syncapi:${TAG} --build-arg component=dendrite-sync-api-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-signingkeyserver:${TAG} --build-arg component=dendrite-signing-key-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite-userapi:${TAG} --build-arg component=dendrite-user-api-server -f build/docker/Dockerfile.component .

View file

@ -1,17 +1,19 @@
#!/bin/bash
docker pull matrixdotorg/dendrite:monolith
TAG=${1:-latest}
docker pull matrixdotorg/dendrite:appservice
docker pull matrixdotorg/dendrite:clientapi
docker pull matrixdotorg/dendrite:clientproxy
docker pull matrixdotorg/dendrite:eduserver
docker pull matrixdotorg/dendrite:federationapi
docker pull matrixdotorg/dendrite:federationsender
docker pull matrixdotorg/dendrite:federationproxy
docker pull matrixdotorg/dendrite:keyserver
docker pull matrixdotorg/dendrite:mediaapi
docker pull matrixdotorg/dendrite:roomserver
docker pull matrixdotorg/dendrite:syncapi
docker pull matrixdotorg/dendrite:signingkeyserver
docker pull matrixdotorg/dendrite:userapi
echo "Pulling tag '${TAG}'"
docker pull matrixdotorg/dendrite-monolith:${TAG}
docker pull matrixdotorg/dendrite-appservice:${TAG}
docker pull matrixdotorg/dendrite-clientapi:${TAG}
docker pull matrixdotorg/dendrite-eduserver:${TAG}
docker pull matrixdotorg/dendrite-federationapi:${TAG}
docker pull matrixdotorg/dendrite-federationsender:${TAG}
docker pull matrixdotorg/dendrite-keyserver:${TAG}
docker pull matrixdotorg/dendrite-mediaapi:${TAG}
docker pull matrixdotorg/dendrite-roomserver:${TAG}
docker pull matrixdotorg/dendrite-syncapi:${TAG}
docker pull matrixdotorg/dendrite-signingkeyserver:${TAG}
docker pull matrixdotorg/dendrite-userapi:${TAG}

View file

@ -1,17 +1,19 @@
#!/bin/bash
docker push matrixdotorg/dendrite:monolith
TAG=${1:-latest}
docker push matrixdotorg/dendrite:appservice
docker push matrixdotorg/dendrite:clientapi
docker push matrixdotorg/dendrite:clientproxy
docker push matrixdotorg/dendrite:eduserver
docker push matrixdotorg/dendrite:federationapi
docker push matrixdotorg/dendrite:federationsender
docker push matrixdotorg/dendrite:federationproxy
docker push matrixdotorg/dendrite:keyserver
docker push matrixdotorg/dendrite:mediaapi
docker push matrixdotorg/dendrite:roomserver
docker push matrixdotorg/dendrite:syncapi
docker push matrixdotorg/dendrite:signingkeyserver
docker push matrixdotorg/dendrite:userapi
echo "Pushing tag '${TAG}'"
docker push matrixdotorg/dendrite-monolith:${TAG}
docker push matrixdotorg/dendrite-appservice:${TAG}
docker push matrixdotorg/dendrite-clientapi:${TAG}
docker push matrixdotorg/dendrite-eduserver:${TAG}
docker push matrixdotorg/dendrite-federationapi:${TAG}
docker push matrixdotorg/dendrite-federationsender:${TAG}
docker push matrixdotorg/dendrite-keyserver:${TAG}
docker push matrixdotorg/dendrite-mediaapi:${TAG}
docker push matrixdotorg/dendrite-roomserver:${TAG}
docker push matrixdotorg/dendrite-syncapi:${TAG}
docker push matrixdotorg/dendrite-signingkeyserver:${TAG}
docker push matrixdotorg/dendrite-userapi:${TAG}

View file

@ -344,6 +344,7 @@ func createRoom(
if err = roomserverAPI.SendEventWithState(
req.Context(),
rsAPI,
roomserverAPI.KindNew,
&gomatrixserverlib.RespState{
StateEvents: accumulated,
AuthEvents: accumulated,

View file

@ -76,6 +76,7 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us
if err = roomserverAPI.SendEvents(
ctx, rsAPI,
api.KindNew,
[]gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)},
cfg.Matrix.ServerName,
nil,

View file

@ -170,7 +170,7 @@ func SetAvatarURL(
return jsonerror.InternalServerError()
}
if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@ -288,7 +288,7 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}

View file

@ -121,7 +121,7 @@ func SendRedaction(
JSON: jsonerror.NotFound("Room does not exist"),
}
}
if err = roomserverAPI.SendEvents(context.Background(), rsAPI, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil); err != nil {
if err = roomserverAPI.SendEvents(context.Background(), rsAPI, api.KindNew, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents")
return jsonerror.InternalServerError()
}

View file

@ -92,6 +92,7 @@ func SendEvent(
// event ID in case of duplicate transaction is discarded
if err := api.SendEvents(
req.Context(), rsAPI,
api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(verRes.RoomVersion),
},

View file

@ -141,7 +141,7 @@ func OnIncomingStateRequest(ctx context.Context, device *userapi.Device, rsAPI a
util.GetLogger(ctx).WithError(err).Error("Failed to QueryMembershipForUser")
return jsonerror.InternalServerError()
}
for _, ev := range stateRes.StateEvents {
for _, ev := range stateAfterRes.StateEvents {
stateEvents = append(
stateEvents,
gomatrixserverlib.HeaderedToClientEvent(ev, gomatrixserverlib.FormatAll),

View file

@ -361,6 +361,7 @@ func emit3PIDInviteEvent(
return api.SendEvents(
ctx, rsAPI,
api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
(*event).Headered(queryRes.RoomVersion),
},

View file

@ -96,7 +96,7 @@ brew services start kafka
### SQLite database setup
Dendrite can use the built-in SQLite database engine for small setups.
The SQLite databases do not need to be preconfigured - Dendrite will
The SQLite databases do not need to be pre-built - Dendrite will
create them automatically at startup.
### Postgres database setup
@ -112,7 +112,7 @@ Assuming that Postgres 9.5 (or later) is installed:
* Create the component databases:
```bash
for i in userapi_accounts userapi_devices mediaapi syncapi roomserver signingkeyserver federationsender keyserver appservice e2ekey naffka ; do
for i in mediaapi syncapi roomserver signingkeyserver federationsender appservice keyserver userapi_account userapi_device naffka; do
sudo -u postgres createdb -O dendrite dendrite_$i
done
```
@ -147,8 +147,8 @@ Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Th
* The `server_name` entry to reflect the hostname of your Dendrite server
* The `database` lines with an updated connection string based on your
desired setup, e.g. replacing `component` with the name of the component:
* For Postgres: `postgres://dendrite:password@localhost/component`
desired setup, e.g. replacing `database` with the name of the database:
* For Postgres: `postgres://dendrite:password@localhost/database`
* For SQLite on disk: `file:component.db` or `file:///path/to/component.db`
* Postgres and SQLite can be mixed and matched.
* The `use_naffka` option if using Naffka in a monolith deployment
@ -159,6 +159,10 @@ then configuring `key_perspectives` (like `matrix.org` in the sample) can
help to improve reliability considerably by allowing your homeserver to fetch
public keys for dead homeservers from somewhere else.
**WARNING:** Dendrite supports running all components from the same database in
Postgres mode, but this is **NOT** a supported configuration with SQLite. When
using SQLite, all components **MUST** use their own database file.
## Starting a monolith server
It is possible to use Naffka as an in-process replacement to Kafka when using
@ -179,30 +183,17 @@ as shown below, it will also listen for HTTPS connections on port 8448.
The following contains scripts which will run all the required processes in order to point a Matrix client at Dendrite.
### Client proxy
### nginx (or other reverse proxy)
This is what Matrix clients will talk to. If you use the script below, point
your client at `http://localhost:8008`.
This is what your clients and federated hosts will talk to. It must forward
requests onto the correct API server based on URL:
```bash
./bin/client-api-proxy \
--bind-address ":8008" \
--client-api-server-url "http://localhost:7771" \
--sync-api-server-url "http://localhost:7773" \
--media-api-server-url "http://localhost:7774" \
```
* `/_matrix/client` to the client API server
* `/_matrix/federation` to the federation API server
* `/_matrix/key` to the federation API server
* `/_matrix/media` to the media API server
### Federation proxy
This is what Matrix servers will talk to. This is only required if you want
to support federation.
```bash
./bin/federation-api-proxy \
--bind-address ":8448" \
--federation-api-url "http://localhost:7772" \
--media-api-server-url "http://localhost:7774" \
```
See `docs/nginx/polylith-sample.conf` for a sample configuration.
### Client API server
@ -210,7 +201,7 @@ This is what implements CS API endpoints. Clients talk to this via the proxy in
order to send messages, create and join rooms, etc.
```bash
./bin/dendrite-client-api-server --config=dendrite.yaml
./bin/dendrite-client-api-server --config dendrite.yaml
```
### Sync server
@ -251,7 +242,7 @@ contacted by other components. This includes the following components.
This is what implements the room DAG. Clients do not talk to this.
```bash
./bin/dendrite-room-server --config=dendrite.yaml
./bin/dendrite-room-server --config dendrite.yaml
```
#### Federation sender

View file

@ -290,6 +290,7 @@ func SendJoin(
if !alreadyJoined {
if err = api.SendEvents(
httpReq.Context(), rsAPI,
api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
event.Headered(stateAndAuthChainResponse.RoomVersion),
},

View file

@ -256,6 +256,7 @@ func SendLeave(
// the room, so set SendAsServer to cfg.Matrix.ServerName
if err = api.SendEvents(
httpReq.Context(), rsAPI,
api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
event.Headered(verRes.RoomVersion),
},

View file

@ -403,6 +403,7 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) er
return api.SendEvents(
context.Background(),
t.rsAPI,
api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(stateResp.RoomVersion),
},
@ -586,6 +587,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
err = api.SendEventWithState(
context.Background(),
t.rsAPI,
api.KindOld,
resolvedState,
backwardsExtremity.Headered(roomVersion),
t.haveEventIDs(),
@ -605,6 +607,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
if err = api.SendEvents(
context.Background(),
t.rsAPI,
api.KindOld,
append(headeredNewEvents, e.Headered(roomVersion)),
api.DoNotSendToOtherServers,
nil,

View file

@ -89,7 +89,7 @@ func CreateInvitesFrom3PIDInvites(
}
// Send all the events
if err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil {
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@ -174,6 +174,7 @@ func ExchangeThirdPartyInvite(
// Send the event to the roomserver
if err = api.SendEvents(
httpReq.Context(), rsAPI,
api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
signedEvent.Event.Headered(verRes.RoomVersion),
},

View file

@ -248,6 +248,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// returned state to the roomserver to update our local view.
if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI,
roomserverAPI.KindNew,
respState,
event.Headered(respMakeJoin.RoomVersion),
nil,

View file

@ -21,17 +21,25 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
type Kind int
const (
// KindOutlier event fall outside the contiguous event graph.
// We do not have the state for these events.
// These events are state events used to authenticate other events.
// They can become part of the contiguous event graph via backfill.
KindOutlier = 1
KindOutlier Kind = iota + 1
// KindNew event extend the contiguous graph going forwards.
// They usually don't need state, but may include state if the
// there was a new event that references an event that we don't
// have a copy of.
KindNew = 2
// have a copy of. New events will influence the fwd extremities
// of the room and output events will be generated as a result.
KindNew
// KindOld event extend the graph backwards, or fill gaps in
// history. They may or may not include state. They will not be
// considered for forward extremities, and output events will NOT
// be generated for them.
KindOld
)
// DoNotSendToOtherServers tells us not to send the event to other matrix
@ -43,7 +51,7 @@ const DoNotSendToOtherServers = ""
type InputRoomEvent struct {
// Whether this event is new, backfilled or an outlier.
// This controls how the event is processed.
Kind int `json:"kind"`
Kind Kind `json:"kind"`
// The event JSON for the event to add.
Event gomatrixserverlib.HeaderedEvent `json:"event"`
// List of state event IDs that authenticate this event.

View file

@ -24,6 +24,8 @@ type OutputType string
const (
// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
OutputTypeNewRoomEvent OutputType = "new_room_event"
// OutputTypeOldRoomEvent indicates that the event is an OutputOldRoomEvent
OutputTypeOldRoomEvent OutputType = "old_room_event"
// OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent
OutputTypeNewInviteEvent OutputType = "new_invite_event"
// OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent
@ -58,6 +60,8 @@ type OutputEvent struct {
Type OutputType `json:"type"`
// The content of event with type OutputTypeNewRoomEvent
NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"`
// The content of event with type OutputTypeOldRoomEvent
OldRoomEvent *OutputOldRoomEvent `json:"old_room_event,omitempty"`
// The content of event with type OutputTypeNewInviteEvent
NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
// The content of event with type OutputTypeRetireInviteEvent
@ -178,6 +182,20 @@ func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent {
return append(ore.AddStateEvents, ore.Event)
}
// An OutputOldRoomEvent is written when the roomserver receives an old event.
// This will typically happen as a result of getting either missing events
// or backfilling. Downstream components may wish to send these events to
// clients when it is advantageous to do so, but with the consideration that
// the event is likely a historic event.
//
// Old events do not update forward extremities or the current room state,
// therefore they must not be treated as if they do. Downstream components
// should build their current room state up from OutputNewRoomEvents only.
type OutputOldRoomEvent struct {
// The Event.
Event gomatrixserverlib.HeaderedEvent `json:"event"`
}
// An OutputNewInviteEvent is written whenever an invite becomes active.
// Invite events can be received outside of an existing room so have to be
// tracked separately from the room events themselves.

View file

@ -24,13 +24,14 @@ import (
// SendEvents to the roomserver The events are written with KindNew.
func SendEvents(
ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent,
ctx context.Context, rsAPI RoomserverInternalAPI,
kind Kind, events []gomatrixserverlib.HeaderedEvent,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) error {
ires := make([]InputRoomEvent, len(events))
for i, event := range events {
ires[i] = InputRoomEvent{
Kind: KindNew,
Kind: kind,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
SendAsServer: string(sendAsServer),
@ -40,12 +41,13 @@ func SendEvents(
return SendInputRoomEvents(ctx, rsAPI, ires)
}
// SendEventWithState writes an event with KindNew to the roomserver
// SendEventWithState writes an event with the specified kind to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is
// marked as `true` in haveEventIDs
func SendEventWithState(
ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState,
event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind,
state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent,
haveEventIDs map[string]bool,
) error {
outliers, err := state.Events()
if err != nil {
@ -70,7 +72,7 @@ func SendEventWithState(
}
ires = append(ires, InputRoomEvent{
Kind: KindNew,
Kind: kind,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
HasState: true,

View file

@ -17,6 +17,7 @@
package input
import (
"bytes"
"context"
"fmt"
@ -26,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
@ -44,6 +46,28 @@ func (r *Inputer) processRoomEvent(
headered := input.Event
event := headered.Unwrap()
// if we have already got this event then do not process it again, if the input kind is an outlier.
// Outliers contain no extra information which may warrant a re-processing.
if input.Kind == api.KindOutlier {
evs, err := r.DB.EventsFromIDs(ctx, []string{event.EventID()})
if err == nil && len(evs) == 1 {
// check hash matches if we're on early room versions where the event ID was a random string
idFormat, err := headered.RoomVersion.EventIDFormat()
if err == nil {
switch idFormat {
case gomatrixserverlib.EventIDFormatV1:
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
return event.EventID(), nil
}
default:
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
return event.EventID(), nil
}
}
}
}
// Check that the event passes authentication checks and work out
// the numeric IDs for the auth events.
isRejected := false
@ -119,7 +143,7 @@ func (r *Inputer) processRoomEvent(
// We haven't calculated a state for this event yet.
// Lets calculate one.
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
if err != nil {
if err != nil && input.Kind != api.KindOld {
return "", fmt.Errorf("r.calculateAndSetState: %w", err)
}
}
@ -136,16 +160,31 @@ func (r *Inputer) processRoomEvent(
return event.EventID(), rejectionErr
}
if err = r.updateLatestEvents(
ctx, // context
roomInfo, // room info for the room being updated
stateAtEvent, // state at event (below)
event, // event
input.SendAsServer, // send as server
input.TransactionID, // transaction ID
input.HasState, // rewrites state?
); err != nil {
return "", fmt.Errorf("r.updateLatestEvents: %w", err)
switch input.Kind {
case api.KindNew:
if err = r.updateLatestEvents(
ctx, // context
roomInfo, // room info for the room being updated
stateAtEvent, // state at event (below)
event, // event
input.SendAsServer, // send as server
input.TransactionID, // transaction ID
input.HasState, // rewrites state?
); err != nil {
return "", fmt.Errorf("r.updateLatestEvents: %w", err)
}
case api.KindOld:
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
{
Type: api.OutputTypeOldRoomEvent,
OldRoomEvent: &api.OutputOldRoomEvent{
Event: headered,
},
},
})
if err != nil {
return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err)
}
}
// processing this event resulted in an event (which may not be the one we're processing)
@ -163,7 +202,7 @@ func (r *Inputer) processRoomEvent(
},
})
if err != nil {
return "", fmt.Errorf("r.WriteOutputEvents: %w", err)
return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
}
}

View file

@ -164,8 +164,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
return fmt.Errorf("u.api.updateMemberships: %w", err)
}
var update *api.OutputEvent
update, err = u.makeOutputNewRoomEvent()
update, err := u.makeOutputNewRoomEvent()
if err != nil {
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
}
@ -234,7 +233,7 @@ func (u *latestEventsUpdater) latestState() error {
if err != nil {
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
}
if len(u.removed) > len(u.added) {
if !u.stateAtEvent.Overwrite && len(u.removed) > len(u.added) {
// This really shouldn't happen.
// TODO: What is ultimately the best way to handle this situation?
logrus.Errorf(
@ -259,6 +258,8 @@ func (u *latestEventsUpdater) latestState() error {
return nil
}
// calculateLatest works out the new set of forward extremities. Returns
// true if the new event is included in those extremites, false otherwise.
func (u *latestEventsUpdater) calculateLatest(
oldLatest []types.StateAtEventAndReference,
newEvent types.StateAtEventAndReference,
@ -326,7 +327,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
if err != nil {
return nil, err
}
for _, entry := range u.added {
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
}
@ -339,13 +339,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
for _, entry := range u.stateBeforeEventAdds {
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
}
ore.SendAsServer = u.sendAsServer
// include extra state events if they were added as nearly every downstream component will care about it
// and we'd rather not have them all hit QueryEventsByID at the same time!
if len(ore.AddsStateEventIDs) > 0 {
ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs)
if err != nil {
var err error
if ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs); err != nil {
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
}
}

View file

@ -191,7 +191,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js
t.Helper()
rsAPI, dp := mustCreateRoomserverAPI(t)
hevents := mustLoadRawEvents(t, ver, events)
if err := api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil); err != nil {
if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil); err != nil {
t.Errorf("failed to SendEvents: %s", err)
}
return rsAPI, dp, hevents
@ -337,7 +337,7 @@ func TestOutputRewritesState(t *testing.T) {
deleteDatabase()
rsAPI, producer := mustCreateRoomserverAPI(t)
defer deleteDatabase()
err := api.SendEvents(context.Background(), rsAPI, originalEvents, testOrigin, nil)
err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil)
if err != nil {
t.Fatalf("failed to send original events: %s", err)
}

View file

@ -70,16 +70,14 @@ func (u *LatestEventsUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID {
return u.currentStateSnapshotNID
}
// StorePreviousEvents implements types.RoomRecentEventsUpdater
// StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer
func (u *LatestEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
for _, ref := range previousEventReferences {
if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err)
}
for _, ref := range previousEventReferences {
if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err)
}
return nil
})
}
return nil
}
// IsReferenced implements types.RoomRecentEventsUpdater

View file

@ -492,15 +492,32 @@ func (d *Database) StoreEvent(
if roomInfo == nil && len(prevEvents) > 0 {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
}
// Create an updater - NB: on sqlite this WILL create a txn as we are directly calling the shared DB form of
// GetLatestEventsForUpdate - not via the SQLiteDatabase form which has `nil` txns. This
// function only does SELECTs though so the created txn (at this point) is just a read txn like
// any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater
// to do writes however then this will need to go inside `Writer.Do`.
updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo)
if err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
}
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err)
// Ensure that we atomically store prev events AND commit them. If we don't wrap StorePreviousEvents
// and EndTransaction in a writer then it's possible for a new write txn to be made between the two
// function calls which will then fail with 'database is locked'. This new write txn would HAVE to be
// something like SetRoomAlias/RemoveRoomAlias as normal input events are already done sequentially due to
// SupportsConcurrentRoomInputs() == false on sqlite, though this does not apply to setting room aliases
// as they don't go via InputRoomEvents
err = d.Writer.Do(d.DB, updater.txn, func(txn *sql.Tx) error {
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
return fmt.Errorf("updater.StorePreviousEvents: %w", err)
}
succeeded := true
err = sqlutil.EndTransaction(updater, &succeeded)
return err
})
if err != nil {
return 0, types.StateAtEvent{}, nil, "", err
}
succeeded := true
err = sqlutil.EndTransaction(updater, &succeeded)
}
return roomNID, types.StateAtEvent{

View file

@ -97,6 +97,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
}
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
case api.OutputTypeOldRoomEvent:
return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
@ -168,7 +170,40 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
log.ErrorKey: err,
"add": msg.AddsStateEventIDs,
"del": msg.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure")
}).Panicf("roomserver output log: write new event failure")
return nil
}
if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil {
logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
return err
}
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
return nil
}
func (s *OutputRoomEventConsumer) onOldRoomEvent(
ctx context.Context, msg api.OutputOldRoomEvent,
) error {
ev := msg.Event
pduPos, err := s.db.WriteEvent(
ctx,
&ev,
[]gomatrixserverlib.HeaderedEvent{},
[]string{}, // adds no state
[]string{}, // removes no state
nil, // no transaction
false, // not excluded from sync
)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
}).Panicf("roomserver output log: write old event failure")
return nil
}

View file

@ -57,4 +57,7 @@ The only membership state included in a gapped incremental sync is for senders i
# Blacklisted out of flakiness after #1479
Invited user can reject local invite after originator leaves
Invited user can reject invite for empty room
If user leaves room, remote user changes device and rejoins we see update in /sync and /keys/changes
If user leaves room, remote user changes device and rejoins we see update in /sync and /keys/changes
# Blacklisted due to flakiness
A prev_batch token from incremental sync can be used in the v1 messages API

View file

@ -481,6 +481,7 @@ m.room.history_visibility == "joined" allows/forbids appropriately for Guest use
m.room.history_visibility == "joined" allows/forbids appropriately for Real users
POST rejects invalid utf-8 in JSON
Users cannot kick users who have already left a room
A prev_batch token from incremental sync can be used in the v1 messages API
Event with an invalid signature in the send_join response should not cause room join to fail
Inbound federation rejects typing notifications from wrong remote
Should not be able to take over the room by pretending there is no PL event
Can get rooms/{roomId}/state for a departed room (SPEC-216)