Merge branch 'main' into neilalexander/vhostinguserapi

This commit is contained in:
Neil Alexander 2022-11-07 09:11:28 +00:00
commit b01510c49a
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
10 changed files with 168 additions and 13 deletions

View file

@ -32,7 +32,7 @@ jobs:
if: github.event_name == 'release' # Only for GitHub releases
run: |
echo "RELEASE_VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV
echo "BUILD=$(git rev-parse --short HEAD || "") >> $GITHUB_ENV
echo "BUILD=$(git rev-parse --short HEAD || \"\")" >> $GITHUB_ENV
BRANCH=$(git symbolic-ref --short HEAD | tr -d \/)
[ ${BRANCH} == "main" ] && BRANCH=""
echo "BRANCH=${BRANCH}" >> $GITHUB_ENV
@ -112,7 +112,7 @@ jobs:
if: github.event_name == 'release' # Only for GitHub releases
run: |
echo "RELEASE_VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV
echo "BUILD=$(git rev-parse --short HEAD || "") >> $GITHUB_ENV
echo "BUILD=$(git rev-parse --short HEAD || \"\")" >> $GITHUB_ENV
BRANCH=$(git symbolic-ref --short HEAD | tr -d \/)
[ ${BRANCH} == "main" ] && BRANCH=""
echo "BRANCH=${BRANCH}" >> $GITHUB_ENV
@ -191,7 +191,7 @@ jobs:
if: github.event_name == 'release' # Only for GitHub releases
run: |
echo "RELEASE_VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV
echo "BUILD=$(git rev-parse --short HEAD || "") >> $GITHUB_ENV
echo "BUILD=$(git rev-parse --short HEAD || \"\")" >> $GITHUB_ENV
BRANCH=$(git symbolic-ref --short HEAD | tr -d \/)
[ ${BRANCH} == "main" ] && BRANCH=""
echo "BRANCH=${BRANCH}" >> $GITHUB_ENV
@ -258,7 +258,7 @@ jobs:
if: github.event_name == 'release' # Only for GitHub releases
run: |
echo "RELEASE_VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV
echo "BUILD=$(git rev-parse --short HEAD || "") >> $GITHUB_ENV
echo "BUILD=$(git rev-parse --short HEAD || \"\")" >> $GITHUB_ENV
BRANCH=$(git symbolic-ref --short HEAD | tr -d \/)
[ ${BRANCH} == "main" ] && BRANCH=""
echo "BRANCH=${BRANCH}" >> $GITHUB_ENV

View file

@ -1,5 +1,22 @@
# Changelog
## Dendrite 0.10.7 (2022-11-04)
### Features
* Dendrite will now use a native SQLite port when building with `CGO_ENABLED=0`
* A number of `thirdparty` endpoints have been added, improving support for appservices
### Fixes
* The `"state"` section of the `/sync` response is no longer limited, so state events should not be dropped unexpectedly
* The deduplication of the `"timeline"` and `"state"` sections in `/sync` is now performed after applying history visibility, so state events should not be dropped unexpectedly
* The `prev_batch` token returned by `/sync` is now calculated after applying history visibility, so that the pagination boundaries are correct
* The room summary membership counts in `/sync` should now be calculated properly in more cases
* A false membership leave event should no longer be sent down `/sync` as a result of retiring an accepted invite (contributed by [tak-hntlabs](https://github.com/tak-hntlabs))
* Presence updates are now only sent to other servers for which the user shares rooms
* A bug which could cause a panic when converting events into the `ClientEvent` format has been fixed
## Dendrite 0.10.6 (2022-11-01)
### Features

View file

@ -16,6 +16,7 @@ ARG TARGETARCH
ARG FLAGS
RUN --mount=target=. \
--mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg/mod \
USERARCH=`go env GOARCH` \
GOARCH="$TARGETARCH" \
GOOS="linux" \
@ -23,7 +24,7 @@ RUN --mount=target=. \
go build -v -ldflags="${FLAGS}" -trimpath -o /out/ ./cmd/...
#
# The dendrite base image; mainly creates a user and switches to it
# The dendrite base image
#
FROM alpine:latest AS dendrite-base
LABEL org.opencontainers.image.description="Next-generation Matrix homeserver written in Go"
@ -31,8 +32,6 @@ LABEL org.opencontainers.image.source="https://github.com/matrix-org/dendrite"
LABEL org.opencontainers.image.licenses="Apache-2.0"
LABEL org.opencontainers.image.documentation="https://matrix-org.github.io/dendrite/"
LABEL org.opencontainers.image.vendor="The Matrix.org Foundation C.I.C."
RUN addgroup dendrite && adduser dendrite -G dendrite -u 1337 -D
USER dendrite
#
# Builds the polylith image and only contains the polylith binary

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage"
fedTypes "github.com/matrix-org/dendrite/federationapi/types"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@ -39,6 +40,7 @@ type OutputPresenceConsumer struct {
db storage.Database
queues *queue.OutgoingQueues
isLocalServerName func(gomatrixserverlib.ServerName) bool
rsAPI roomserverAPI.FederationRoomserverAPI
topic string
outboundPresenceEnabled bool
}
@ -50,6 +52,7 @@ func NewOutputPresenceConsumer(
js nats.JetStreamContext,
queues *queue.OutgoingQueues,
store storage.Database,
rsAPI roomserverAPI.FederationRoomserverAPI,
) *OutputPresenceConsumer {
return &OutputPresenceConsumer{
ctx: process.Context(),
@ -60,6 +63,7 @@ func NewOutputPresenceConsumer(
durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
rsAPI: rsAPI,
}
}
@ -89,6 +93,16 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
return true
}
var queryRes roomserverAPI.QueryRoomsForUserResponse
err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
UserID: userID,
WantMembership: "join",
}, &queryRes)
if err != nil {
log.WithError(err).Error("failed to calculate joined rooms for user")
return true
}
presence := msg.Header.Get("presence")
ts, err := strconv.Atoi(msg.Header.Get("last_active_ts"))
@ -96,11 +110,13 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
return true
}
joined, err := t.db.GetAllJoinedHosts(ctx)
// send this presence to all servers who share rooms with this user.
joined, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
if err != nil {
log.WithError(err).Error("failed to get joined hosts")
return true
}
if len(joined) == 0 {
return true
}

View file

@ -164,7 +164,7 @@ func NewInternalAPI(
}
presenceConsumer := consumers.NewOutputPresenceConsumer(
base.ProcessContext, cfg, js, queues, federationDB,
base.ProcessContext, cfg, js, queues, federationDB, rsAPI,
)
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start presence consumer")

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 10
VersionPatch = 6
VersionPatch = 7
VersionTag = "" // example: "rc1"
)

View file

@ -264,6 +264,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// Work out what the highest stream position is for all of the events in this
// room that were returned.
latestPosition := r.To
if r.Backwards {
latestPosition = r.From
}
updateLatestPosition := func(mostRecentEventID string) {
var pos types.StreamPosition
if _, pos, err = snapshot.PositionInTopology(ctx, mostRecentEventID); err == nil {

View file

@ -46,3 +46,6 @@ If a device list update goes missing, the server resyncs on the next one
# Might be a bug in the test because leaves do appear :-(
Leaves are present in non-gapped incremental syncs
# Below test was passing for the wrong reason, failing correctly since #2858
New federated private chats get full presence information (SYN-115)

View file

@ -682,7 +682,6 @@ Presence changes are reported to local room members
Presence changes are also reported to remote room members
Presence changes to UNAVAILABLE are reported to local room members
Presence changes to UNAVAILABLE are reported to remote room members
New federated private chats get full presence information (SYN-115)
/upgrade copies >100 power levels to the new room
Room state after a rejected message event is the same as before
Room state after a rejected state event is the same as before
@ -760,3 +759,6 @@ Can filter rooms/{roomId}/members
Current state appears in timeline in private history with many messages after
AS can publish rooms in their own list
AS and main public room lists are separate
/upgrade preserves direct room state
local user has tags copied to the new room
remote user has tags copied to the new room

View file

@ -2,12 +2,16 @@ package consumers
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/tidwall/gjson"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
@ -185,13 +189,115 @@ func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventTy
}
}
func (s *OutputRoomEventConsumer) handleRoomUpgrade(ctx context.Context, oldRoomID, newRoomID string, localMembers []*localMembership, roomSize int) error {
for _, membership := range localMembers {
// Copy any existing push rules from old -> new room
if err := s.copyPushrules(ctx, oldRoomID, newRoomID, membership.Localpart); err != nil {
return err
}
// preserve m.direct room state
if err := s.updateMDirect(ctx, oldRoomID, newRoomID, membership.Localpart, roomSize); err != nil {
return err
}
// copy existing m.tag entries, if any
if err := s.copyTags(ctx, oldRoomID, newRoomID, membership.Localpart); err != nil {
return err
}
}
return nil
}
func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID, newRoomID string, localpart string) error {
pushRules, err := s.db.QueryPushRules(ctx, localpart)
if err != nil {
return fmt.Errorf("failed to query pushrules for user: %w", err)
}
if pushRules == nil {
return nil
}
for _, roomRule := range pushRules.Global.Room {
if roomRule.RuleID != oldRoomID {
continue
}
cpRool := *roomRule
cpRool.RuleID = newRoomID
pushRules.Global.Room = append(pushRules.Global.Room, &cpRool)
rules, err := json.Marshal(pushRules)
if err != nil {
return err
}
if err = s.db.SaveAccountData(ctx, localpart, "", "m.push_rules", rules); err != nil {
return fmt.Errorf("failed to update pushrules: %w", err)
}
}
return nil
}
// updateMDirect copies the "is_direct" flag from oldRoomID to newROomID
func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, newRoomID, localpart string, roomSize int) error {
// this is most likely not a DM, so skip updating m.direct state
if roomSize > 2 {
return nil
}
// Get direct message state
directChatsRaw, err := s.db.GetAccountDataByType(ctx, localpart, "", "m.direct")
if err != nil {
return fmt.Errorf("failed to get m.direct from database: %w", err)
}
directChats := gjson.ParseBytes(directChatsRaw)
newDirectChats := make(map[string][]string)
// iterate over all userID -> roomIDs
directChats.ForEach(func(userID, roomIDs gjson.Result) bool {
var found bool
for _, roomID := range roomIDs.Array() {
newDirectChats[userID.Str] = append(newDirectChats[userID.Str], roomID.Str)
// add the new roomID to m.direct
if roomID.Str == oldRoomID {
found = true
newDirectChats[userID.Str] = append(newDirectChats[userID.Str], newRoomID)
}
}
// Only hit the database if we found the old room as a DM for this user
if found {
var data []byte
data, err = json.Marshal(newDirectChats)
if err != nil {
return true
}
if err = s.db.SaveAccountData(ctx, localpart, "", "m.direct", data); err != nil {
return true
}
}
return true
})
if err != nil {
return fmt.Errorf("failed to update m.direct state")
}
return nil
}
func (s *OutputRoomEventConsumer) copyTags(ctx context.Context, oldRoomID, newRoomID, localpart string) error {
tag, err := s.db.GetAccountDataByType(ctx, localpart, oldRoomID, "m.tag")
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
}
if tag == nil {
return nil
}
return s.db.SaveAccountData(ctx, localpart, newRoomID, "m.tag", tag)
}
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error {
members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
if err != nil {
return fmt.Errorf("s.localRoomMembers: %w", err)
}
if event.Type() == gomatrixserverlib.MRoomMember {
switch {
case event.Type() == gomatrixserverlib.MRoomMember:
cevent := gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatAll)
var member *localMembership
member, err = newLocalMembership(&cevent)
@ -203,6 +309,15 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gom
// should also be pushed to the target user.
members = append(members, member)
}
case event.Type() == "m.room.tombstone" && event.StateKeyEquals(""):
// Handle room upgrades
oldRoomID := event.RoomID()
newRoomID := gjson.GetBytes(event.Content(), "replacement_room").Str
if err = s.handleRoomUpgrade(ctx, oldRoomID, newRoomID, members, roomSize); err != nil {
// while inconvenient, this shouldn't stop us from sending push notifications
log.WithError(err).Errorf("UserAPI: failed to handle room upgrade for users")
}
}
// TODO: run in parallel with localRoomMembers.