mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-01 03:03:10 -06:00
Merge branch 's7evink/presence' of github.com:matrix-org/dendrite into s7evink/presence
This commit is contained in:
commit
68a4bf6f69
9
.github/workflows/dendrite.yml
vendored
9
.github/workflows/dendrite.yml
vendored
|
|
@ -233,7 +233,14 @@ jobs:
|
|||
- name: Summarise results.tap
|
||||
if: ${{ always() }}
|
||||
run: /sytest/scripts/tap_to_gha.pl /logs/results.tap
|
||||
|
||||
- name: Sytest List Maintenance
|
||||
if: ${{ always() }}
|
||||
run: /src/show-expected-fail-tests.sh /logs/results.tap /src/sytest-whitelist /src/sytest-blacklist
|
||||
continue-on-error: true # not fatal
|
||||
- name: Are We Synapse Yet?
|
||||
if: ${{ always() }}
|
||||
run: /src/are-we-synapse-yet.py /logs/results.tap -v
|
||||
continue-on-error: true # not fatal
|
||||
- name: Upload Sytest logs
|
||||
uses: actions/upload-artifact@v2
|
||||
if: ${{ always() }}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
from __future__ import division
|
||||
import argparse
|
||||
import re
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Usage: $ ./are-we-synapse-yet.py [-v] results.tap
|
||||
# This script scans a results.tap file from Dendrite's CI process and spits out
|
||||
|
|
@ -156,6 +156,7 @@ def parse_test_line(line):
|
|||
# ✓ POST /register downcases capitals in usernames
|
||||
# ...
|
||||
def print_stats(header_name, gid_to_tests, gid_to_name, verbose):
|
||||
ci = os.getenv("CI") # When running from GHA, this groups the subsections
|
||||
subsections = [] # Registration: 100% (13/13 tests)
|
||||
subsection_test_names = {} # 'subsection name': ["✓ Test 1", "✓ Test 2", "× Test 3"]
|
||||
total_passing = 0
|
||||
|
|
@ -186,11 +187,11 @@ def print_stats(header_name, gid_to_tests, gid_to_name, verbose):
|
|||
print("%s: %s (%d/%d tests)" % (header_name, pct, total_passing, total_tests))
|
||||
print("-" * (len(header_name)+1))
|
||||
for line in subsections:
|
||||
print("::group::%s" % (line,))
|
||||
print("%s%s" % ("::group::" if ci and verbose else "", line,))
|
||||
if verbose:
|
||||
for test_name_and_pass_mark in subsection_test_names[line]:
|
||||
print(" %s" % (test_name_and_pass_mark,))
|
||||
print("::endgroup::")
|
||||
print("%s" % ("::endgroup::" if ci else ""))
|
||||
print("")
|
||||
|
||||
def main(results_tap_path, verbose):
|
||||
|
|
|
|||
51
build.cmd
Normal file
51
build.cmd
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
@echo off
|
||||
|
||||
:ENTRY_POINT
|
||||
setlocal EnableDelayedExpansion
|
||||
|
||||
REM script base dir
|
||||
set SCRIPTDIR=%~dp0
|
||||
set PROJDIR=%SCRIPTDIR:~0,-1%
|
||||
|
||||
REM Put installed packages into ./bin
|
||||
set GOBIN=%PROJDIR%\bin
|
||||
|
||||
set FLAGS=
|
||||
|
||||
REM Check if sources are under Git control
|
||||
if not exist ".git" goto :CHECK_BIN
|
||||
|
||||
REM set BUILD=`git rev-parse --short HEAD \\ ""`
|
||||
FOR /F "tokens=*" %%X IN ('git rev-parse --short HEAD') DO (
|
||||
set BUILD=%%X
|
||||
)
|
||||
|
||||
REM set BRANCH=`(git symbolic-ref --short HEAD \ tr -d \/ ) \\ ""`
|
||||
FOR /F "tokens=*" %%X IN ('git symbolic-ref --short HEAD') DO (
|
||||
set BRANCHRAW=%%X
|
||||
set BRANCH=!BRANCHRAW:/=!
|
||||
)
|
||||
if "%BRANCH%" == "main" set BRANCH=
|
||||
|
||||
set FLAGS=-X github.com/matrix-org/dendrite/internal.branch=%BRANCH% -X github.com/matrix-org/dendrite/internal.build=%BUILD%
|
||||
|
||||
:CHECK_BIN
|
||||
if exist "bin" goto :ALL_SET
|
||||
mkdir "bin"
|
||||
|
||||
:ALL_SET
|
||||
set CGO_ENABLED=1
|
||||
for /D %%P in (cmd\*) do (
|
||||
go build -trimpath -ldflags "%FLAGS%" -v -o ".\bin" ".\%%P"
|
||||
)
|
||||
|
||||
set CGO_ENABLED=0
|
||||
set GOOS=js
|
||||
set GOARCH=wasm
|
||||
go build -trimpath -ldflags "%FLAGS%" -o bin\main.wasm .\cmd\dendritejs-pinecone
|
||||
|
||||
goto :DONE
|
||||
|
||||
:DONE
|
||||
echo Done
|
||||
endlocal
|
||||
|
|
@ -62,6 +62,17 @@ global:
|
|||
- matrix.org
|
||||
- vector.im
|
||||
|
||||
# Disables federation. Dendrite will not be able to make any outbound HTTP requests
|
||||
# to other servers and the federation API will not be exposed.
|
||||
disable_federation: false
|
||||
|
||||
# Configures the handling of presence events.
|
||||
presence:
|
||||
# Whether inbound presence events are allowed, e.g. receiving presence events from other servers
|
||||
enable_inbound: false
|
||||
# Whether outbound presence events are allowed, e.g. sending presence events to other servers
|
||||
enable_outbound: false
|
||||
|
||||
# Configuration for NATS JetStream
|
||||
jetstream:
|
||||
# A list of NATS Server addresses to connect to. If none are specified, an
|
||||
|
|
|
|||
|
|
@ -177,11 +177,11 @@ func (p *SyncAPIProducer) SendTyping(
|
|||
}
|
||||
|
||||
func (p *SyncAPIProducer) SendPresence(
|
||||
ctx context.Context, userID, presence string, statusMsg *string,
|
||||
ctx context.Context, userID string, presence types.Presence, statusMsg *string,
|
||||
) error {
|
||||
m := nats.NewMsg(p.TopicPresenceEvent)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
m.Header.Set("presence", presence)
|
||||
m.Header.Set("presence", presence.String())
|
||||
if statusMsg != nil {
|
||||
m.Header.Set("status_msg", *statusMsg)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
|
|
@ -46,7 +45,7 @@ func SetPresence(
|
|||
producer *producers.SyncAPIProducer,
|
||||
userID string,
|
||||
) util.JSONResponse {
|
||||
if cfg.Matrix.DisablePresence {
|
||||
if !cfg.Matrix.Presence.EnableOutbound {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: struct{}{},
|
||||
|
|
@ -63,15 +62,16 @@ func SetPresence(
|
|||
if parseErr != nil {
|
||||
return *parseErr
|
||||
}
|
||||
p := strings.ToLower(presence.Presence)
|
||||
if _, ok := types.PresenceToInt[p]; !ok {
|
||||
|
||||
presenceStatus, ok := types.PresenceFromString(presence.Presence)
|
||||
if !ok {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", p)),
|
||||
JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", presence.Presence)),
|
||||
}
|
||||
}
|
||||
|
||||
err := producer.SendPresence(req.Context(), userID, presence.Presence, presence.StatusMsg)
|
||||
err := producer.SendPresence(req.Context(), userID, presenceStatus, presence.StatusMsg)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("failed to update presence")
|
||||
return util.JSONResponse{
|
||||
|
|
@ -112,7 +112,7 @@ func GetPresence(
|
|||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: types.PresenceClientResponse{
|
||||
Presence: "unavailable",
|
||||
Presence: types.PresenceUnavailable.String(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -124,7 +124,7 @@ func GetPresence(
|
|||
}
|
||||
}
|
||||
|
||||
p := types.Presence{LastActiveTS: gomatrixserverlib.Timestamp(lastActive)}
|
||||
p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(lastActive)}
|
||||
currentlyActive := p.CurrentlyActive()
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
|
|
|
|||
|
|
@ -945,6 +945,16 @@ func Setup(
|
|||
}),
|
||||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
|
||||
v3mux.Handle("/rooms/{roomID}/upgrade",
|
||||
httputil.MakeAuthAPI("rooms_upgrade", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return UpgradeRoom(req, device, cfg, vars["roomID"], userAPI, rsAPI, asAPI)
|
||||
}),
|
||||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
|
||||
v3mux.Handle("/devices",
|
||||
httputil.MakeAuthAPI("get_devices", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
return GetDevicesByLocalpart(req, userAPI, device)
|
||||
|
|
|
|||
|
|
@ -272,5 +272,24 @@ func generateSendEvent(
|
|||
JSON: jsonerror.Forbidden(err.Error()), // TODO: Is this error string comprehensible to the client?
|
||||
}
|
||||
}
|
||||
|
||||
// User should not be able to send a tombstone event to the same room.
|
||||
if e.Type() == "m.room.tombstone" {
|
||||
content := make(map[string]interface{})
|
||||
if err = json.Unmarshal(e.Content(), &content); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("Cannot unmarshal the event content.")
|
||||
return nil, &util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.BadJSON("Cannot unmarshal the event content."),
|
||||
}
|
||||
}
|
||||
if content["replacement_room"] == e.RoomID() {
|
||||
return nil, &util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.InvalidParam("Cannot send tombstone event that points to the same room."),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return e.Event, nil
|
||||
}
|
||||
|
|
|
|||
92
clientapi/routing/upgrade_room.go
Normal file
92
clientapi/routing/upgrade_room.go
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package routing
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/version"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
type upgradeRoomRequest struct {
|
||||
NewVersion string `json:"new_version"`
|
||||
}
|
||||
|
||||
type upgradeRoomResponse struct {
|
||||
ReplacementRoom string `json:"replacement_room"`
|
||||
}
|
||||
|
||||
// UpgradeRoom implements /upgrade
|
||||
func UpgradeRoom(
|
||||
req *http.Request, device *userapi.Device,
|
||||
cfg *config.ClientAPI,
|
||||
roomID string, profileAPI userapi.UserProfileAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
) util.JSONResponse {
|
||||
var r upgradeRoomRequest
|
||||
if rErr := httputil.UnmarshalJSONRequest(req, &r); rErr != nil {
|
||||
return *rErr
|
||||
}
|
||||
|
||||
// Validate that the room version is supported
|
||||
if _, err := version.SupportedRoomVersion(gomatrixserverlib.RoomVersion(r.NewVersion)); err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.UnsupportedRoomVersion("This server does not support that room version"),
|
||||
}
|
||||
}
|
||||
|
||||
upgradeReq := roomserverAPI.PerformRoomUpgradeRequest{
|
||||
UserID: device.UserID,
|
||||
RoomID: roomID,
|
||||
RoomVersion: gomatrixserverlib.RoomVersion(r.NewVersion),
|
||||
}
|
||||
upgradeResp := roomserverAPI.PerformRoomUpgradeResponse{}
|
||||
|
||||
rsAPI.PerformRoomUpgrade(req.Context(), &upgradeReq, &upgradeResp)
|
||||
|
||||
if upgradeResp.Error != nil {
|
||||
if upgradeResp.Error.Code == roomserverAPI.PerformErrorNoRoom {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusNotFound,
|
||||
JSON: jsonerror.NotFound("Room does not exist"),
|
||||
}
|
||||
} else if upgradeResp.Error.Code == roomserverAPI.PerformErrorNotAllowed {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden(upgradeResp.Error.Msg),
|
||||
}
|
||||
} else {
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: upgradeRoomResponse{
|
||||
ReplacementRoom: upgradeResp.NewRoomID,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -91,6 +91,10 @@ func main() {
|
|||
cfg.UserAPI.BCryptCost = bcrypt.MinCost
|
||||
cfg.Global.JetStream.InMemory = true
|
||||
cfg.ClientAPI.RegistrationSharedSecret = "complement"
|
||||
cfg.Global.Presence = config.PresenceOptions{
|
||||
EnableInbound: true,
|
||||
EnableOutbound: true,
|
||||
}
|
||||
}
|
||||
|
||||
j, err := yaml.Marshal(cfg)
|
||||
|
|
|
|||
|
|
@ -68,8 +68,12 @@ global:
|
|||
# to other servers and the federation API will not be exposed.
|
||||
disable_federation: false
|
||||
|
||||
# Disable presence. Dendrite will not handle presence events.
|
||||
disable_presence: true
|
||||
# Configures the handling of presence events.
|
||||
presence:
|
||||
# Whether inbound presence events are allowed, e.g. receiving presence events from other servers
|
||||
enable_inbound: false
|
||||
# Whether outbound presence events are allowed, e.g. sending presence events to other servers
|
||||
enable_outbound: false
|
||||
|
||||
# Server notices allows server admins to send messages to all users.
|
||||
server_notices:
|
||||
|
|
|
|||
|
|
@ -34,6 +34,10 @@ If you want to run a polylith deployment, you also need:
|
|||
|
||||
* A standalone [NATS Server](https://github.com/nats-io/nats-server) deployment with JetStream enabled
|
||||
|
||||
If you want to build it on Windows, you need `gcc` in the path:
|
||||
|
||||
* [MinGW-w64](https://www.mingw-w64.org/)
|
||||
|
||||
## Building Dendrite
|
||||
|
||||
Start by cloning the code:
|
||||
|
|
@ -45,10 +49,16 @@ cd dendrite
|
|||
|
||||
Then build it:
|
||||
|
||||
* Linux or UNIX-like systems:
|
||||
```bash
|
||||
./build.sh
|
||||
```
|
||||
|
||||
* Windows:
|
||||
```dos
|
||||
build.cmd
|
||||
```
|
||||
|
||||
## Install NATS Server
|
||||
|
||||
Follow the [NATS Server installation instructions](https://docs.nats.io/running-a-nats-service/introduction/installation) and then [start your NATS deployment](https://docs.nats.io/running-a-nats-service/introduction/running).
|
||||
|
|
|
|||
|
|
@ -40,9 +40,10 @@ type OutputPresenceConsumer struct {
|
|||
queues *queue.OutgoingQueues
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
topic string
|
||||
outboundPresenceEnabled bool
|
||||
}
|
||||
|
||||
// NewOutputReceiptConsumer creates a new OutputReceiptConsumer. Call Start() to begin consuming typing events.
|
||||
// NewOutputPresenceConsumer creates a new OutputPresenceConsumer. Call Start() to begin consuming events.
|
||||
func NewOutputPresenceConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.FederationAPI,
|
||||
|
|
@ -58,18 +59,22 @@ func NewOutputPresenceConsumer(
|
|||
ServerName: cfg.Matrix.ServerName,
|
||||
durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
|
||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
||||
outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
|
||||
}
|
||||
}
|
||||
|
||||
// Start consuming from the clientapi
|
||||
func (t *OutputPresenceConsumer) Start() error {
|
||||
if !t.outboundPresenceEnabled {
|
||||
return nil
|
||||
}
|
||||
return jetstream.JetStreamConsumer(
|
||||
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
|
||||
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
||||
)
|
||||
}
|
||||
|
||||
// onMessage is called in response to a message received on the receipt
|
||||
// onMessage is called in response to a message received on the presence
|
||||
// events topic from the client api.
|
||||
func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
||||
// only send presence events which originated from us
|
||||
|
|
@ -105,7 +110,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) b
|
|||
statusMsg = &status
|
||||
}
|
||||
|
||||
p := types.Presence{LastActiveTS: gomatrixserverlib.Timestamp(ts)}
|
||||
p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(ts)}
|
||||
|
||||
content := fedTypes.Presence{
|
||||
Push: []fedTypes.PresenceContent{
|
||||
|
|
|
|||
|
|
@ -392,17 +392,17 @@ func (r *FederationInternalAPI) performOutboundPeekUsingServer(
|
|||
|
||||
// we have the peek state now so let's process regardless of whether upstream gives up
|
||||
ctx = context.Background()
|
||||
|
||||
respState := respPeek.ToRespState()
|
||||
authEvents := respState.AuthEvents.UntrustedEvents(respPeek.RoomVersion)
|
||||
|
||||
// authenticate the state returned (check its auth events etc)
|
||||
// the equivalent of CheckSendJoinResponse()
|
||||
authEvents, _, err := respState.Check(ctx, respPeek.RoomVersion, r.keyRing, federatedAuthProvider(ctx, r.federation, r.keyRing, serverName))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error checking state returned from peeking: %w", err)
|
||||
}
|
||||
if err = sanityCheckAuthChain(authEvents); err != nil {
|
||||
return fmt.Errorf("sanityCheckAuthChain: %w", err)
|
||||
}
|
||||
if err = respState.Check(ctx, respPeek.RoomVersion, r.keyRing, federatedAuthProvider(ctx, r.federation, r.keyRing, serverName)); err != nil {
|
||||
return fmt.Errorf("error checking state returned from peeking: %w", err)
|
||||
}
|
||||
|
||||
// If we've got this far, the remote server is peeking.
|
||||
if renewing {
|
||||
|
|
|
|||
|
|
@ -146,11 +146,11 @@ func (p *SyncAPIProducer) SendTyping(
|
|||
}
|
||||
|
||||
func (p *SyncAPIProducer) SendPresence(
|
||||
ctx context.Context, userID, presence string, statusMsg *string, lastActiveAgo int64,
|
||||
ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveAgo int64,
|
||||
) error {
|
||||
m := nats.NewMsg(p.TopicPresenceEvent)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
m.Header.Set("presence", presence)
|
||||
m.Header.Set("presence", presence.String())
|
||||
if statusMsg != nil {
|
||||
m.Header.Set("status_msg", *statusMsg)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import (
|
|||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
|
@ -134,6 +135,7 @@ func Send(
|
|||
keyAPI: keyAPI,
|
||||
roomsMu: mu,
|
||||
producer: producer,
|
||||
inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound,
|
||||
}
|
||||
|
||||
var txnEvents struct {
|
||||
|
|
@ -192,6 +194,7 @@ type txnReq struct {
|
|||
roomsMu *internal.MutexByRoom
|
||||
servers federationAPI.ServersInRoomProvider
|
||||
producer *producers.SyncAPIProducer
|
||||
inboundPresenceEnabled bool
|
||||
}
|
||||
|
||||
// A subset of FederationClient functionality that txn requires. Useful for testing.
|
||||
|
|
@ -390,9 +393,11 @@ func (t *txnReq) processEDUs(ctx context.Context) {
|
|||
logrus.WithError(err).Errorf("Failed to process signing key update")
|
||||
}
|
||||
case gomatrixserverlib.MPresence:
|
||||
if t.inboundPresenceEnabled {
|
||||
if err := t.processPresence(ctx, e); err != nil {
|
||||
logrus.WithError(err).Errorf("Failed to process presence update")
|
||||
}
|
||||
}
|
||||
default:
|
||||
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
|
||||
}
|
||||
|
|
@ -406,7 +411,12 @@ func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) e
|
|||
return err
|
||||
}
|
||||
for _, content := range payload.Push {
|
||||
if err := t.producer.SendPresence(ctx, content.UserID, content.Presence, content.StatusMsg, content.LastActiveAgo); err != nil {
|
||||
presence, ok := syncTypes.PresenceFromString(content.Presence)
|
||||
if !ok {
|
||||
logrus.Warnf("invalid presence '%s', skipping.", content.Presence)
|
||||
continue
|
||||
}
|
||||
if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
3
go.mod
3
go.mod
|
|
@ -38,7 +38,7 @@ require (
|
|||
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
|
||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220404174134-970e11ad2142
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220405134050-301e340659d5
|
||||
github.com/matrix-org/pinecone v0.0.0-20220404141326-e526fa82f79d
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.10
|
||||
|
|
@ -64,7 +64,6 @@ require (
|
|||
golang.org/x/image v0.0.0-20211028202545-6944b10bf410
|
||||
golang.org/x/mobile v0.0.0-20220325161704-447654d348e3
|
||||
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd
|
||||
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 // indirect
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
|
||||
gopkg.in/h2non/bimg.v1 v1.1.5
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
|
|
|
|||
8
go.sum
8
go.sum
|
|
@ -977,8 +977,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1
|
|||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220404174134-970e11ad2142 h1:kkFKjbPn9oySI07bA3vVInFMjTRdMxASgwJXmABli4o=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220404174134-970e11ad2142/go.mod h1:+WF5InseAMgi1fTnU46JH39IDpEvLep0fDzx9LDf2Bo=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220405134050-301e340659d5 h1:Fkennny7+Z/5pygrhjFMZbz1j++P2hhhLoT7NO3p8DQ=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220405134050-301e340659d5/go.mod h1:V5eO8rn/C3rcxig37A/BCeKerLFS+9Avg/77FIeTZ48=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220404141326-e526fa82f79d h1:1+T4eOPRsf6cr0lMPW4oO2k8TTHm4mqIh65kpEID5Rk=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220404141326-e526fa82f79d/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
|
||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
|
|
@ -1727,8 +1727,8 @@ golang.org/x/sys v0.0.0-20211102192858-4dd72447c267/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 h1:nhht2DYV/Sn3qOayu8lM+cU1ii9sTLUeBQwQQfUHtrs=
|
||||
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64 h1:D1v9ucDTYBtbz5vNuBbAhIMAGhQhJ6Ym5ah3maMVNX4=
|
||||
golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
|
|
|
|||
|
|
@ -170,6 +170,9 @@ type RoomserverInternalAPI interface {
|
|||
// PerformForget forgets a rooms history for a specific user
|
||||
PerformForget(ctx context.Context, req *PerformForgetRequest, resp *PerformForgetResponse) error
|
||||
|
||||
// PerformRoomUpgrade upgrades a room to a newer version
|
||||
PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse)
|
||||
|
||||
// Asks for the default room version as preferred by the server.
|
||||
QueryRoomVersionCapabilities(
|
||||
ctx context.Context,
|
||||
|
|
|
|||
|
|
@ -67,6 +67,15 @@ func (t *RoomserverInternalAPITrace) PerformUnpeek(
|
|||
util.GetLogger(ctx).Infof("PerformUnpeek req=%+v res=%+v", js(req), js(res))
|
||||
}
|
||||
|
||||
func (t *RoomserverInternalAPITrace) PerformRoomUpgrade(
|
||||
ctx context.Context,
|
||||
req *PerformRoomUpgradeRequest,
|
||||
res *PerformRoomUpgradeResponse,
|
||||
) {
|
||||
t.Impl.PerformRoomUpgrade(ctx, req, res)
|
||||
util.GetLogger(ctx).Infof("PerformRoomUpgrade req=%+v res=%+v", js(req), js(res))
|
||||
}
|
||||
|
||||
func (t *RoomserverInternalAPITrace) PerformJoin(
|
||||
ctx context.Context,
|
||||
req *PerformJoinRequest,
|
||||
|
|
|
|||
|
|
@ -203,3 +203,14 @@ type PerformForgetRequest struct {
|
|||
}
|
||||
|
||||
type PerformForgetResponse struct{}
|
||||
|
||||
type PerformRoomUpgradeRequest struct {
|
||||
RoomID string `json:"room_id"`
|
||||
UserID string `json:"user_id"`
|
||||
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
|
||||
}
|
||||
|
||||
type PerformRoomUpgradeResponse struct {
|
||||
NewRoomID string
|
||||
Error *PerformError
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ type RoomserverInternalAPI struct {
|
|||
*perform.Publisher
|
||||
*perform.Backfiller
|
||||
*perform.Forgetter
|
||||
*perform.Upgrader
|
||||
ProcessContext *process.ProcessContext
|
||||
DB storage.Database
|
||||
Cfg *config.RoomServer
|
||||
|
|
@ -159,6 +160,10 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
|
|||
r.Forgetter = &perform.Forgetter{
|
||||
DB: r.DB,
|
||||
}
|
||||
r.Upgrader = &perform.Upgrader{
|
||||
Cfg: r.Cfg,
|
||||
URSAPI: r,
|
||||
}
|
||||
|
||||
if err := r.Inputer.Start(); err != nil {
|
||||
logrus.WithError(err).Panic("failed to start roomserver input API")
|
||||
|
|
|
|||
|
|
@ -613,12 +613,13 @@ func (t *missingStateReq) lookupMissingStateViaState(
|
|||
return nil, err
|
||||
}
|
||||
// Check that the returned state is valid.
|
||||
if err := state.Check(ctx, roomVersion, t.keys, nil); err != nil {
|
||||
authEvents, stateEvents, err := state.Check(ctx, roomVersion, t.keys, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parsedState := &parsedRespState{
|
||||
AuthEvents: make([]*gomatrixserverlib.Event, len(state.AuthEvents)),
|
||||
StateEvents: make([]*gomatrixserverlib.Event, len(state.StateEvents)),
|
||||
AuthEvents: authEvents,
|
||||
StateEvents: stateEvents,
|
||||
}
|
||||
// Cache the results of this state lookup and deduplicate anything we already
|
||||
// have in the cache, freeing up memory.
|
||||
|
|
|
|||
709
roomserver/internal/perform/perform_upgrade.go
Normal file
709
roomserver/internal/perform/perform_upgrade.go
Normal file
|
|
@ -0,0 +1,709 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package perform
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Upgrader struct {
|
||||
Cfg *config.RoomServer
|
||||
URSAPI api.RoomserverInternalAPI
|
||||
}
|
||||
|
||||
// fledglingEvent is a helper representation of an event used when creating many events in succession.
|
||||
type fledglingEvent struct {
|
||||
Type string `json:"type"`
|
||||
StateKey string `json:"state_key"`
|
||||
Content interface{} `json:"content"`
|
||||
}
|
||||
|
||||
// PerformRoomUpgrade upgrades a room from one version to another
|
||||
func (r *Upgrader) PerformRoomUpgrade(
|
||||
ctx context.Context,
|
||||
req *api.PerformRoomUpgradeRequest,
|
||||
res *api.PerformRoomUpgradeResponse,
|
||||
) {
|
||||
res.NewRoomID, res.Error = r.performRoomUpgrade(ctx, req)
|
||||
if res.Error != nil {
|
||||
res.NewRoomID = ""
|
||||
logrus.WithContext(ctx).WithError(res.Error).Error("Room upgrade failed")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Upgrader) performRoomUpgrade(
|
||||
ctx context.Context,
|
||||
req *api.PerformRoomUpgradeRequest,
|
||||
) (string, *api.PerformError) {
|
||||
roomID := req.RoomID
|
||||
userID := req.UserID
|
||||
evTime := time.Now()
|
||||
|
||||
// Return an immediate error if the room does not exist
|
||||
if err := r.validateRoomExists(ctx, roomID); err != nil {
|
||||
return "", &api.PerformError{
|
||||
Code: api.PerformErrorNoRoom,
|
||||
Msg: "Error validating that the room exists",
|
||||
}
|
||||
}
|
||||
|
||||
// 1. Check if the user is authorized to actually perform the upgrade (can send m.room.tombstone)
|
||||
if !r.userIsAuthorized(ctx, userID, roomID) {
|
||||
return "", &api.PerformError{
|
||||
Code: api.PerformErrorNotAllowed,
|
||||
Msg: "You don't have permission to upgrade the room, power level too low.",
|
||||
}
|
||||
}
|
||||
|
||||
// TODO (#267): Check room ID doesn't clash with an existing one, and we
|
||||
// probably shouldn't be using pseudo-random strings, maybe GUIDs?
|
||||
newRoomID := fmt.Sprintf("!%s:%s", util.RandomString(16), r.Cfg.Matrix.ServerName)
|
||||
|
||||
// Get the existing room state for the old room.
|
||||
oldRoomReq := &api.QueryLatestEventsAndStateRequest{
|
||||
RoomID: roomID,
|
||||
}
|
||||
oldRoomRes := &api.QueryLatestEventsAndStateResponse{}
|
||||
if err := r.URSAPI.QueryLatestEventsAndState(ctx, oldRoomReq, oldRoomRes); err != nil {
|
||||
return "", &api.PerformError{
|
||||
Msg: fmt.Sprintf("Failed to get latest state: %s", err),
|
||||
}
|
||||
}
|
||||
|
||||
// Make the tombstone event
|
||||
tombstoneEvent, pErr := r.makeTombstoneEvent(ctx, evTime, userID, roomID, newRoomID)
|
||||
if pErr != nil {
|
||||
return "", pErr
|
||||
}
|
||||
|
||||
// Generate the initial events we need to send into the new room. This includes copied state events and bans
|
||||
// as well as the power level events needed to set up the room
|
||||
eventsToMake, pErr := r.generateInitialEvents(ctx, oldRoomRes, userID, roomID, string(req.RoomVersion), tombstoneEvent)
|
||||
if pErr != nil {
|
||||
return "", pErr
|
||||
}
|
||||
|
||||
// 5. Send the tombstone event to the old room (must do this before we set the new canonical_alias)
|
||||
if pErr = r.sendHeaderedEvent(ctx, tombstoneEvent); pErr != nil {
|
||||
return "", pErr
|
||||
}
|
||||
|
||||
// Send the setup events to the new room
|
||||
if pErr = r.sendInitialEvents(ctx, evTime, userID, newRoomID, string(req.RoomVersion), eventsToMake); pErr != nil {
|
||||
return "", pErr
|
||||
}
|
||||
|
||||
// If the old room was public, make sure the new one is too
|
||||
if pErr = r.publishIfOldRoomWasPublic(ctx, roomID, newRoomID); pErr != nil {
|
||||
return "", pErr
|
||||
}
|
||||
|
||||
// If the old room had a canonical alias event, it should be deleted in the old room
|
||||
if pErr = r.clearOldCanonicalAliasEvent(ctx, oldRoomRes, evTime, userID, roomID); pErr != nil {
|
||||
return "", pErr
|
||||
}
|
||||
|
||||
// 4. Move local aliases to the new room
|
||||
if pErr = moveLocalAliases(ctx, roomID, newRoomID, userID, r.URSAPI); pErr != nil {
|
||||
return "", pErr
|
||||
}
|
||||
|
||||
// 6. Restrict power levels in the old room
|
||||
if pErr = r.restrictOldRoomPowerLevels(ctx, evTime, userID, roomID); pErr != nil {
|
||||
return "", pErr
|
||||
}
|
||||
|
||||
return newRoomID, nil
|
||||
}
|
||||
|
||||
func (r *Upgrader) getRoomPowerLevels(ctx context.Context, roomID string) (*gomatrixserverlib.PowerLevelContent, *api.PerformError) {
|
||||
oldPowerLevelsEvent := api.GetStateEvent(ctx, r.URSAPI, roomID, gomatrixserverlib.StateKeyTuple{
|
||||
EventType: gomatrixserverlib.MRoomPowerLevels,
|
||||
StateKey: "",
|
||||
})
|
||||
powerLevelContent, err := oldPowerLevelsEvent.PowerLevels()
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error()
|
||||
return nil, &api.PerformError{
|
||||
Msg: "powerLevel event was not actually a power level event",
|
||||
}
|
||||
}
|
||||
return powerLevelContent, nil
|
||||
}
|
||||
|
||||
func (r *Upgrader) restrictOldRoomPowerLevels(ctx context.Context, evTime time.Time, userID, roomID string) *api.PerformError {
|
||||
restrictedPowerLevelContent, pErr := r.getRoomPowerLevels(ctx, roomID)
|
||||
if pErr != nil {
|
||||
return pErr
|
||||
}
|
||||
|
||||
// From: https://spec.matrix.org/v1.2/client-server-api/#server-behaviour-16
|
||||
// If possible, the power levels in the old room should also be modified to
|
||||
// prevent sending of events and inviting new users. For example, setting
|
||||
// events_default and invite to the greater of 50 and users_default + 1.
|
||||
restrictedDefaultPowerLevel := int64(50)
|
||||
if restrictedPowerLevelContent.UsersDefault+1 > restrictedDefaultPowerLevel {
|
||||
restrictedDefaultPowerLevel = restrictedPowerLevelContent.UsersDefault + 1
|
||||
}
|
||||
restrictedPowerLevelContent.EventsDefault = restrictedDefaultPowerLevel
|
||||
restrictedPowerLevelContent.Invite = restrictedDefaultPowerLevel
|
||||
|
||||
restrictedPowerLevelsHeadered, resErr := r.makeHeaderedEvent(ctx, evTime, userID, roomID, fledglingEvent{
|
||||
Type: gomatrixserverlib.MRoomPowerLevels,
|
||||
StateKey: "",
|
||||
Content: restrictedPowerLevelContent,
|
||||
})
|
||||
if resErr != nil {
|
||||
if resErr.Code == api.PerformErrorNotAllowed {
|
||||
util.GetLogger(ctx).WithField(logrus.ErrorKey, resErr).Warn("UpgradeRoom: Could not restrict power levels in old room")
|
||||
} else {
|
||||
return resErr
|
||||
}
|
||||
} else {
|
||||
if resErr = r.sendHeaderedEvent(ctx, restrictedPowerLevelsHeadered); resErr != nil {
|
||||
return resErr
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func moveLocalAliases(ctx context.Context,
|
||||
roomID, newRoomID, userID string,
|
||||
URSAPI api.RoomserverInternalAPI) *api.PerformError {
|
||||
var err error
|
||||
|
||||
aliasReq := api.GetAliasesForRoomIDRequest{RoomID: roomID}
|
||||
aliasRes := api.GetAliasesForRoomIDResponse{}
|
||||
if err = URSAPI.GetAliasesForRoomID(ctx, &aliasReq, &aliasRes); err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: "Could not get aliases for old room",
|
||||
}
|
||||
}
|
||||
|
||||
for _, alias := range aliasRes.Aliases {
|
||||
removeAliasReq := api.RemoveRoomAliasRequest{UserID: userID, Alias: alias}
|
||||
removeAliasRes := api.RemoveRoomAliasResponse{}
|
||||
if err = URSAPI.RemoveRoomAlias(ctx, &removeAliasReq, &removeAliasRes); err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: "api.RemoveRoomAlias failed",
|
||||
}
|
||||
}
|
||||
|
||||
setAliasReq := api.SetRoomAliasRequest{UserID: userID, Alias: alias, RoomID: newRoomID}
|
||||
setAliasRes := api.SetRoomAliasResponse{}
|
||||
if err = URSAPI.SetRoomAlias(ctx, &setAliasReq, &setAliasRes); err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: "api.SetRoomAlias failed",
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Upgrader) clearOldCanonicalAliasEvent(ctx context.Context, oldRoom *api.QueryLatestEventsAndStateResponse, evTime time.Time, userID, roomID string) *api.PerformError {
|
||||
for _, event := range oldRoom.StateEvents {
|
||||
if event.Type() != gomatrixserverlib.MRoomCanonicalAlias || !event.StateKeyEquals("") {
|
||||
continue
|
||||
}
|
||||
var aliasContent struct {
|
||||
Alias string `json:"alias"`
|
||||
AltAliases []string `json:"alt_aliases"`
|
||||
}
|
||||
if err := json.Unmarshal(event.Content(), &aliasContent); err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: fmt.Sprintf("Failed to unmarshal canonical aliases: %s", err),
|
||||
}
|
||||
}
|
||||
if aliasContent.Alias == "" && len(aliasContent.AltAliases) == 0 {
|
||||
// There are no canonical aliases to clear, therefore do nothing.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
emptyCanonicalAliasEvent, resErr := r.makeHeaderedEvent(ctx, evTime, userID, roomID, fledglingEvent{
|
||||
Type: gomatrixserverlib.MRoomCanonicalAlias,
|
||||
Content: map[string]interface{}{},
|
||||
})
|
||||
if resErr != nil {
|
||||
if resErr.Code == api.PerformErrorNotAllowed {
|
||||
util.GetLogger(ctx).WithField(logrus.ErrorKey, resErr).Warn("UpgradeRoom: Could not set empty canonical alias event in old room")
|
||||
} else {
|
||||
return resErr
|
||||
}
|
||||
} else {
|
||||
if resErr = r.sendHeaderedEvent(ctx, emptyCanonicalAliasEvent); resErr != nil {
|
||||
return resErr
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Upgrader) publishIfOldRoomWasPublic(ctx context.Context, roomID, newRoomID string) *api.PerformError {
|
||||
// check if the old room was published
|
||||
var pubQueryRes api.QueryPublishedRoomsResponse
|
||||
err := r.URSAPI.QueryPublishedRooms(ctx, &api.QueryPublishedRoomsRequest{
|
||||
RoomID: roomID,
|
||||
}, &pubQueryRes)
|
||||
if err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: "QueryPublishedRooms failed",
|
||||
}
|
||||
}
|
||||
|
||||
// if the old room is published (was public), publish the new room
|
||||
if len(pubQueryRes.RoomIDs) == 1 {
|
||||
publishNewRoomAndUnpublishOldRoom(ctx, r.URSAPI, roomID, newRoomID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func publishNewRoomAndUnpublishOldRoom(
|
||||
ctx context.Context,
|
||||
URSAPI api.RoomserverInternalAPI,
|
||||
oldRoomID, newRoomID string,
|
||||
) {
|
||||
// expose this room in the published room list
|
||||
var pubNewRoomRes api.PerformPublishResponse
|
||||
URSAPI.PerformPublish(ctx, &api.PerformPublishRequest{
|
||||
RoomID: newRoomID,
|
||||
Visibility: "public",
|
||||
}, &pubNewRoomRes)
|
||||
if pubNewRoomRes.Error != nil {
|
||||
// treat as non-fatal since the room is already made by this point
|
||||
util.GetLogger(ctx).WithError(pubNewRoomRes.Error).Error("failed to visibility:public")
|
||||
}
|
||||
|
||||
var unpubOldRoomRes api.PerformPublishResponse
|
||||
// remove the old room from the published room list
|
||||
URSAPI.PerformPublish(ctx, &api.PerformPublishRequest{
|
||||
RoomID: oldRoomID,
|
||||
Visibility: "private",
|
||||
}, &unpubOldRoomRes)
|
||||
if unpubOldRoomRes.Error != nil {
|
||||
// treat as non-fatal since the room is already made by this point
|
||||
util.GetLogger(ctx).WithError(unpubOldRoomRes.Error).Error("failed to visibility:private")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Upgrader) validateRoomExists(ctx context.Context, roomID string) error {
|
||||
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
|
||||
verRes := api.QueryRoomVersionForRoomResponse{}
|
||||
if err := r.URSAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
|
||||
return &api.PerformError{
|
||||
Code: api.PerformErrorNoRoom,
|
||||
Msg: "Room does not exist",
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Upgrader) userIsAuthorized(ctx context.Context, userID, roomID string,
|
||||
) bool {
|
||||
plEvent := api.GetStateEvent(ctx, r.URSAPI, roomID, gomatrixserverlib.StateKeyTuple{
|
||||
EventType: gomatrixserverlib.MRoomPowerLevels,
|
||||
StateKey: "",
|
||||
})
|
||||
if plEvent == nil {
|
||||
return false
|
||||
}
|
||||
pl, err := plEvent.PowerLevels()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
// Check for power level required to send tombstone event (marks the current room as obsolete),
|
||||
// if not found, use the StateDefault power level
|
||||
return pl.UserLevel(userID) >= pl.EventLevel("m.room.tombstone", true)
|
||||
}
|
||||
|
||||
// nolint:gocyclo
|
||||
func (r *Upgrader) generateInitialEvents(ctx context.Context, oldRoom *api.QueryLatestEventsAndStateResponse, userID, roomID, newVersion string, tombstoneEvent *gomatrixserverlib.HeaderedEvent) ([]fledglingEvent, *api.PerformError) {
|
||||
state := make(map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent, len(oldRoom.StateEvents))
|
||||
for _, event := range oldRoom.StateEvents {
|
||||
if event.StateKey() == nil {
|
||||
// This shouldn't ever happen, but better to be safe than sorry.
|
||||
continue
|
||||
}
|
||||
if event.Type() == gomatrixserverlib.MRoomMember && !event.StateKeyEquals(userID) {
|
||||
// With the exception of bans and invites which we do want to copy, we
|
||||
// should ignore membership events that aren't our own, as event auth will
|
||||
// prevent us from being able to create membership events on behalf of other
|
||||
// users anyway unless they are invites or bans.
|
||||
membership, err := event.Membership()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
switch membership {
|
||||
case gomatrixserverlib.Ban:
|
||||
case gomatrixserverlib.Invite:
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
state[gomatrixserverlib.StateKeyTuple{EventType: event.Type(), StateKey: *event.StateKey()}] = event
|
||||
}
|
||||
|
||||
// The following events are ones that we are going to override manually
|
||||
// in the following section.
|
||||
override := map[gomatrixserverlib.StateKeyTuple]struct{}{
|
||||
{EventType: gomatrixserverlib.MRoomCreate, StateKey: ""}: {},
|
||||
{EventType: gomatrixserverlib.MRoomMember, StateKey: userID}: {},
|
||||
{EventType: gomatrixserverlib.MRoomPowerLevels, StateKey: ""}: {},
|
||||
{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}: {},
|
||||
}
|
||||
|
||||
// The overridden events are essential events that must be present in the
|
||||
// old room state. Check that they are there.
|
||||
for tuple := range override {
|
||||
if _, ok := state[tuple]; !ok {
|
||||
return nil, &api.PerformError{
|
||||
Msg: fmt.Sprintf("Essential event of type %q state key %q is missing", tuple.EventType, tuple.StateKey),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
oldCreateEvent := state[gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCreate, StateKey: ""}]
|
||||
oldMembershipEvent := state[gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomMember, StateKey: userID}]
|
||||
oldPowerLevelsEvent := state[gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomPowerLevels, StateKey: ""}]
|
||||
oldJoinRulesEvent := state[gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}]
|
||||
|
||||
// Create the new room create event. Using a map here instead of CreateContent
|
||||
// means that we preserve any other interesting fields that might be present
|
||||
// in the create event (such as for the room types MSC).
|
||||
newCreateContent := map[string]interface{}{}
|
||||
_ = json.Unmarshal(oldCreateEvent.Content(), &newCreateContent)
|
||||
newCreateContent["creator"] = userID
|
||||
newCreateContent["room_version"] = newVersion
|
||||
newCreateContent["predecessor"] = gomatrixserverlib.PreviousRoom{
|
||||
EventID: tombstoneEvent.EventID(),
|
||||
RoomID: roomID,
|
||||
}
|
||||
newCreateEvent := fledglingEvent{
|
||||
Type: gomatrixserverlib.MRoomCreate,
|
||||
StateKey: "",
|
||||
Content: newCreateContent,
|
||||
}
|
||||
|
||||
// Now create the new membership event. Same rules apply as above, so
|
||||
// that we preserve fields we don't otherwise know about. We'll always
|
||||
// set the membership to join though, because that is necessary to auth
|
||||
// the events after it.
|
||||
newMembershipContent := map[string]interface{}{}
|
||||
_ = json.Unmarshal(oldMembershipEvent.Content(), &newMembershipContent)
|
||||
newMembershipContent["membership"] = gomatrixserverlib.Join
|
||||
newMembershipEvent := fledglingEvent{
|
||||
Type: gomatrixserverlib.MRoomMember,
|
||||
StateKey: userID,
|
||||
Content: newMembershipContent,
|
||||
}
|
||||
|
||||
// We might need to temporarily give ourselves a higher power level
|
||||
// than we had in the old room in order to be able to send all of
|
||||
// the relevant state events. This function will return whether we
|
||||
// had to override the power level events or not — if we did, we
|
||||
// need to send the original power levels again later on.
|
||||
powerLevelContent, err := oldPowerLevelsEvent.PowerLevels()
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error()
|
||||
return nil, &api.PerformError{
|
||||
Msg: "Power level event content was invalid",
|
||||
}
|
||||
}
|
||||
tempPowerLevelsEvent, powerLevelsOverridden := createTemporaryPowerLevels(powerLevelContent, userID)
|
||||
|
||||
// Now do the join rules event, same as the create and membership
|
||||
// events. We'll set a sane default of "invite" so that if the
|
||||
// existing join rules contains garbage, the room can still be
|
||||
// upgraded.
|
||||
newJoinRulesContent := map[string]interface{}{
|
||||
"join_rule": gomatrixserverlib.Invite, // sane default
|
||||
}
|
||||
_ = json.Unmarshal(oldJoinRulesEvent.Content(), &newJoinRulesContent)
|
||||
newJoinRulesEvent := fledglingEvent{
|
||||
Type: gomatrixserverlib.MRoomJoinRules,
|
||||
StateKey: "",
|
||||
Content: newJoinRulesContent,
|
||||
}
|
||||
|
||||
eventsToMake := make([]fledglingEvent, 0, len(state))
|
||||
eventsToMake = append(
|
||||
eventsToMake, newCreateEvent, newMembershipEvent,
|
||||
tempPowerLevelsEvent, newJoinRulesEvent,
|
||||
)
|
||||
|
||||
// For some reason Sytest expects there to be a guest access event.
|
||||
// Create one if it doesn't exist.
|
||||
if _, ok := state[gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomGuestAccess, StateKey: ""}]; !ok {
|
||||
eventsToMake = append(eventsToMake, fledglingEvent{
|
||||
Type: gomatrixserverlib.MRoomGuestAccess,
|
||||
Content: map[string]string{
|
||||
"guest_access": "forbidden",
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Duplicate all of the old state events into the new room.
|
||||
for tuple, event := range state {
|
||||
if _, ok := override[tuple]; ok {
|
||||
// Don't duplicate events we have overridden already. They
|
||||
// are already in `eventsToMake`.
|
||||
continue
|
||||
}
|
||||
newEvent := fledglingEvent{
|
||||
Type: tuple.EventType,
|
||||
StateKey: tuple.StateKey,
|
||||
}
|
||||
if err = json.Unmarshal(event.Content(), &newEvent.Content); err != nil {
|
||||
logrus.WithError(err).Error("Failed to unmarshal old event")
|
||||
continue
|
||||
}
|
||||
eventsToMake = append(eventsToMake, newEvent)
|
||||
}
|
||||
|
||||
// If we sent a temporary power level event into the room before,
|
||||
// override that now by restoring the original power levels.
|
||||
if powerLevelsOverridden {
|
||||
eventsToMake = append(eventsToMake, fledglingEvent{
|
||||
Type: gomatrixserverlib.MRoomPowerLevels,
|
||||
Content: powerLevelContent,
|
||||
})
|
||||
}
|
||||
return eventsToMake, nil
|
||||
}
|
||||
|
||||
func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, userID, newRoomID, newVersion string, eventsToMake []fledglingEvent) *api.PerformError {
|
||||
var err error
|
||||
var builtEvents []*gomatrixserverlib.HeaderedEvent
|
||||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||
for i, e := range eventsToMake {
|
||||
depth := i + 1 // depth starts at 1
|
||||
|
||||
builder := gomatrixserverlib.EventBuilder{
|
||||
Sender: userID,
|
||||
RoomID: newRoomID,
|
||||
Type: e.Type,
|
||||
StateKey: &e.StateKey,
|
||||
Depth: int64(depth),
|
||||
}
|
||||
err = builder.SetContent(e.Content)
|
||||
if err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: "builder.SetContent failed",
|
||||
}
|
||||
}
|
||||
if i > 0 {
|
||||
builder.PrevEvents = []gomatrixserverlib.EventReference{builtEvents[i-1].EventReference()}
|
||||
}
|
||||
var event *gomatrixserverlib.Event
|
||||
event, err = r.buildEvent(&builder, &authEvents, evTime, gomatrixserverlib.RoomVersion(newVersion))
|
||||
if err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: "buildEvent failed",
|
||||
}
|
||||
}
|
||||
|
||||
if err = gomatrixserverlib.Allowed(event, &authEvents); err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: "gomatrixserverlib.Allowed failed",
|
||||
}
|
||||
}
|
||||
|
||||
// Add the event to the list of auth events
|
||||
builtEvents = append(builtEvents, event.Headered(gomatrixserverlib.RoomVersion(newVersion)))
|
||||
err = authEvents.AddEvent(event)
|
||||
if err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: "authEvents.AddEvent failed",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inputs := make([]api.InputRoomEvent, 0, len(builtEvents))
|
||||
for _, event := range builtEvents {
|
||||
inputs = append(inputs, api.InputRoomEvent{
|
||||
Kind: api.KindNew,
|
||||
Event: event,
|
||||
Origin: r.Cfg.Matrix.ServerName,
|
||||
SendAsServer: api.DoNotSendToOtherServers,
|
||||
})
|
||||
}
|
||||
if err = api.SendInputRoomEvents(ctx, r.URSAPI, inputs, false); err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: "api.SendInputRoomEvents failed",
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Upgrader) makeTombstoneEvent(
|
||||
ctx context.Context,
|
||||
evTime time.Time,
|
||||
userID, roomID, newRoomID string,
|
||||
) (*gomatrixserverlib.HeaderedEvent, *api.PerformError) {
|
||||
content := map[string]interface{}{
|
||||
"body": "This room has been replaced",
|
||||
"replacement_room": newRoomID,
|
||||
}
|
||||
event := fledglingEvent{
|
||||
Type: "m.room.tombstone",
|
||||
Content: content,
|
||||
}
|
||||
return r.makeHeaderedEvent(ctx, evTime, userID, roomID, event)
|
||||
}
|
||||
|
||||
func (r *Upgrader) makeHeaderedEvent(ctx context.Context, evTime time.Time, userID, roomID string, event fledglingEvent) (*gomatrixserverlib.HeaderedEvent, *api.PerformError) {
|
||||
builder := gomatrixserverlib.EventBuilder{
|
||||
Sender: userID,
|
||||
RoomID: roomID,
|
||||
Type: event.Type,
|
||||
StateKey: &event.StateKey,
|
||||
}
|
||||
err := builder.SetContent(event.Content)
|
||||
if err != nil {
|
||||
return nil, &api.PerformError{
|
||||
Msg: "builder.SetContent failed",
|
||||
}
|
||||
}
|
||||
var queryRes api.QueryLatestEventsAndStateResponse
|
||||
headeredEvent, err := eventutil.QueryAndBuildEvent(ctx, &builder, r.Cfg.Matrix, evTime, r.URSAPI, &queryRes)
|
||||
if err == eventutil.ErrRoomNoExists {
|
||||
return nil, &api.PerformError{
|
||||
Code: api.PerformErrorNoRoom,
|
||||
Msg: "Room does not exist",
|
||||
}
|
||||
} else if e, ok := err.(gomatrixserverlib.BadJSONError); ok {
|
||||
return nil, &api.PerformError{
|
||||
Msg: e.Error(),
|
||||
}
|
||||
} else if e, ok := err.(gomatrixserverlib.EventValidationError); ok {
|
||||
if e.Code == gomatrixserverlib.EventValidationTooLarge {
|
||||
return nil, &api.PerformError{
|
||||
Msg: e.Error(),
|
||||
}
|
||||
}
|
||||
return nil, &api.PerformError{
|
||||
Msg: e.Error(),
|
||||
}
|
||||
} else if err != nil {
|
||||
return nil, &api.PerformError{
|
||||
Msg: "eventutil.BuildEvent failed",
|
||||
}
|
||||
}
|
||||
// check to see if this user can perform this operation
|
||||
stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents))
|
||||
for i := range queryRes.StateEvents {
|
||||
stateEvents[i] = queryRes.StateEvents[i].Event
|
||||
}
|
||||
provider := gomatrixserverlib.NewAuthEvents(stateEvents)
|
||||
if err = gomatrixserverlib.Allowed(headeredEvent.Event, &provider); err != nil {
|
||||
return nil, &api.PerformError{
|
||||
Code: api.PerformErrorNotAllowed,
|
||||
Msg: err.Error(), // TODO: Is this error string comprehensible to the client?
|
||||
}
|
||||
}
|
||||
|
||||
return headeredEvent, nil
|
||||
}
|
||||
|
||||
func createTemporaryPowerLevels(powerLevelContent *gomatrixserverlib.PowerLevelContent, userID string) (fledglingEvent, bool) {
|
||||
// Work out what power level we need in order to be able to send events
|
||||
// of all types into the room.
|
||||
neededPowerLevel := powerLevelContent.StateDefault
|
||||
for _, powerLevel := range powerLevelContent.Events {
|
||||
if powerLevel > neededPowerLevel {
|
||||
neededPowerLevel = powerLevel
|
||||
}
|
||||
}
|
||||
|
||||
// Make a copy of the existing power level content.
|
||||
tempPowerLevelContent := *powerLevelContent
|
||||
powerLevelsOverridden := false
|
||||
|
||||
// At this point, the "Users", "Events" and "Notifications" keys are all
|
||||
// pointing to the map of the original PL content, so we will specifically
|
||||
// override the users map with a new one and duplicate the values deeply,
|
||||
// so that we can modify them without modifying the original.
|
||||
tempPowerLevelContent.Users = make(map[string]int64, len(powerLevelContent.Users))
|
||||
for key, value := range powerLevelContent.Users {
|
||||
tempPowerLevelContent.Users[key] = value
|
||||
}
|
||||
|
||||
// If the user who is upgrading the room doesn't already have sufficient
|
||||
// power, then elevate their power levels.
|
||||
if tempPowerLevelContent.UserLevel(userID) < neededPowerLevel {
|
||||
tempPowerLevelContent.Users[userID] = neededPowerLevel
|
||||
powerLevelsOverridden = true
|
||||
}
|
||||
|
||||
// Then return the temporary power levels event.
|
||||
return fledglingEvent{
|
||||
Type: gomatrixserverlib.MRoomPowerLevels,
|
||||
Content: tempPowerLevelContent,
|
||||
}, powerLevelsOverridden
|
||||
}
|
||||
|
||||
func (r *Upgrader) sendHeaderedEvent(
|
||||
ctx context.Context,
|
||||
headeredEvent *gomatrixserverlib.HeaderedEvent,
|
||||
) *api.PerformError {
|
||||
var inputs []api.InputRoomEvent
|
||||
inputs = append(inputs, api.InputRoomEvent{
|
||||
Kind: api.KindNew,
|
||||
Event: headeredEvent,
|
||||
Origin: r.Cfg.Matrix.ServerName,
|
||||
SendAsServer: api.DoNotSendToOtherServers,
|
||||
})
|
||||
if err := api.SendInputRoomEvents(ctx, r.URSAPI, inputs, false); err != nil {
|
||||
return &api.PerformError{
|
||||
Msg: "api.SendInputRoomEvents failed",
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Upgrader) buildEvent(
|
||||
builder *gomatrixserverlib.EventBuilder,
|
||||
provider gomatrixserverlib.AuthEventProvider,
|
||||
evTime time.Time,
|
||||
roomVersion gomatrixserverlib.RoomVersion,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
refs, err := eventsNeeded.AuthEventReferences(provider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
builder.AuthEvents = refs
|
||||
event, err := builder.Build(
|
||||
evTime, r.Cfg.Matrix.ServerName, r.Cfg.Matrix.KeyID,
|
||||
r.Cfg.Matrix.PrivateKey, roomVersion,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot build event %s : Builder failed to build. %w", builder.Type, err)
|
||||
}
|
||||
return event, nil
|
||||
}
|
||||
|
|
@ -32,6 +32,7 @@ const (
|
|||
RoomserverPerformInvitePath = "/roomserver/performInvite"
|
||||
RoomserverPerformPeekPath = "/roomserver/performPeek"
|
||||
RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
|
||||
RoomserverPerformRoomUpgradePath = "/roomserver/performRoomUpgrade"
|
||||
RoomserverPerformJoinPath = "/roomserver/performJoin"
|
||||
RoomserverPerformLeavePath = "/roomserver/performLeave"
|
||||
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
|
||||
|
|
@ -252,6 +253,23 @@ func (h *httpRoomserverInternalAPI) PerformUnpeek(
|
|||
}
|
||||
}
|
||||
|
||||
func (h *httpRoomserverInternalAPI) PerformRoomUpgrade(
|
||||
ctx context.Context,
|
||||
request *api.PerformRoomUpgradeRequest,
|
||||
response *api.PerformRoomUpgradeResponse,
|
||||
) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformRoomUpgrade")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.roomserverURL + RoomserverPerformRoomUpgradePath
|
||||
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
if err != nil {
|
||||
response.Error = &api.PerformError{
|
||||
Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpRoomserverInternalAPI) PerformLeave(
|
||||
ctx context.Context,
|
||||
request *api.PerformLeaveRequest,
|
||||
|
|
|
|||
|
|
@ -96,6 +96,17 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
|
|||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(RoomserverPerformRoomUpgradePath,
|
||||
httputil.MakeInternalAPI("performRoomUpgrade", func(req *http.Request) util.JSONResponse {
|
||||
var request api.PerformRoomUpgradeRequest
|
||||
var response api.PerformRoomUpgradeResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
r.PerformRoomUpgrade(req.Context(), &request, &response)
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(RoomserverPerformPublishPath,
|
||||
httputil.MakeInternalAPI("performPublish", func(req *http.Request) util.JSONResponse {
|
||||
var request api.PerformPublishRequest
|
||||
|
|
|
|||
|
|
@ -41,8 +41,8 @@ type Global struct {
|
|||
// to other servers and the federation API will not be exposed.
|
||||
DisableFederation bool `yaml:"disable_federation"`
|
||||
|
||||
// Disable presence. Dendrite will not handle presence events.
|
||||
DisablePresence bool `yaml:"disable_presence"`
|
||||
// Configures the handling of presence events.
|
||||
Presence PresenceOptions `yaml:"presence"`
|
||||
|
||||
// List of domains that the server will trust as identity servers to
|
||||
// verify third-party identifiers.
|
||||
|
|
@ -71,7 +71,6 @@ func (c *Global) Defaults(generate bool) {
|
|||
c.PrivateKeyPath = "matrix_key.pem"
|
||||
_, c.PrivateKey, _ = ed25519.GenerateKey(rand.New(rand.NewSource(0)))
|
||||
c.KeyID = "ed25519:auto"
|
||||
c.DisablePresence = false
|
||||
}
|
||||
c.KeyValidityPeriod = time.Hour * 24 * 7
|
||||
|
||||
|
|
@ -229,3 +228,11 @@ func (c *DNSCacheOptions) Verify(configErrs *ConfigErrors, isMonolith bool) {
|
|||
checkPositive(configErrs, "cache_size", int64(c.CacheSize))
|
||||
checkPositive(configErrs, "cache_lifetime", int64(c.CacheLifetime))
|
||||
}
|
||||
|
||||
// PresenceOptions defines possible configurations for presence events.
|
||||
type PresenceOptions struct {
|
||||
// Whether inbound presence events are allowed
|
||||
EnableInbound bool `yaml:"enable_inbound"`
|
||||
// Whether outbound presence events are allowed
|
||||
EnableOutbound bool `yaml:"enable_outbound"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -89,17 +89,17 @@ if [ -n "${tests_to_add}" ] && [ -n "${already_in_whitelist}" ]; then
|
|||
fi
|
||||
|
||||
if [ -n "${tests_to_add}" ]; then
|
||||
echo "**ERROR**: The following tests passed but are not present in \`$2\`. Please append them to the file:"
|
||||
echo "\`\`\`"
|
||||
echo "::error::The following tests passed but are not present in \`$2\`. Please append them to the file:"
|
||||
echo "::group::Passing tests"
|
||||
echo -e "${tests_to_add}"
|
||||
echo "\`\`\`"
|
||||
echo "::endgroup::"
|
||||
fi
|
||||
|
||||
if [ -n "${already_in_whitelist}" ]; then
|
||||
echo "**WARN**: Tests in the whitelist still marked as **expected fail**:"
|
||||
echo "\`\`\`"
|
||||
echo "::warning::Tests in the whitelist still marked as **expected fail**:"
|
||||
echo "::group::Still marked as expected fail"
|
||||
echo -e "${already_in_whitelist}"
|
||||
echo "\`\`\`"
|
||||
echo "::endgroup::"
|
||||
fi
|
||||
|
||||
exit ${fail_build}
|
||||
|
|
|
|||
|
|
@ -42,10 +42,11 @@ type PresenceConsumer struct {
|
|||
stream types.StreamProvider
|
||||
notifier *notifier.Notifier
|
||||
deviceAPI api.UserDeviceAPI
|
||||
cfg *config.SyncAPI
|
||||
}
|
||||
|
||||
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
|
||||
// Call Start() to begin consuming from the EDU server.
|
||||
// NewPresenceConsumer creates a new PresenceConsumer.
|
||||
// Call Start() to begin consuming events.
|
||||
func NewPresenceConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.SyncAPI,
|
||||
|
|
@ -67,6 +68,7 @@ func NewPresenceConsumer(
|
|||
notifier: notifier,
|
||||
stream: stream,
|
||||
deviceAPI: deviceAPI,
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -115,6 +117,9 @@ func (s *PresenceConsumer) Start() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !s.cfg.Matrix.Presence.EnableInbound && !s.cfg.Matrix.Presence.EnableOutbound {
|
||||
return nil
|
||||
}
|
||||
return jetstream.JetStreamConsumer(
|
||||
s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage,
|
||||
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
||||
|
|
@ -139,8 +144,9 @@ func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
|||
newMsg := msg.Header.Get("status_msg")
|
||||
statusMsg = &newMsg
|
||||
}
|
||||
|
||||
pos, err := s.db.UpdatePresence(ctx, userID, presence, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
|
||||
// OK is already checked, so no need to do it again
|
||||
p, _ := types.PresenceFromString(presence)
|
||||
pos, err := s.db.UpdatePresence(ctx, userID, p, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,10 +16,10 @@ package producers
|
|||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
|
@ -31,11 +31,11 @@ type FederationAPIPresenceProducer struct {
|
|||
}
|
||||
|
||||
func (f *FederationAPIPresenceProducer) SendPresence(
|
||||
userID, presence string, statusMsg *string,
|
||||
userID string, presence types.Presence, statusMsg *string,
|
||||
) error {
|
||||
msg := nats.NewMsg(f.Topic)
|
||||
msg.Header.Set(jetstream.UserID, userID)
|
||||
msg.Header.Set("presence", strings.ToLower(presence))
|
||||
msg.Header.Set("presence", presence.String())
|
||||
msg.Header.Set("from_sync", "true") // only update last_active_ts and presence
|
||||
msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
|
||||
|
||||
|
|
|
|||
|
|
@ -153,8 +153,8 @@ type Database interface {
|
|||
}
|
||||
|
||||
type Presence interface {
|
||||
UpdatePresence(ctx context.Context, userID, presence string, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
|
||||
GetPresence(ctx context.Context, userID string) (*types.Presence, error)
|
||||
PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.Presence, error)
|
||||
UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
|
||||
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
|
||||
PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error)
|
||||
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,17 +103,16 @@ func (p *presenceStatements) UpsertPresence(
|
|||
txn *sql.Tx,
|
||||
userID string,
|
||||
statusMsg *string,
|
||||
presence string,
|
||||
presence types.Presence,
|
||||
lastActiveTS gomatrixserverlib.Timestamp,
|
||||
fromSync bool,
|
||||
) (pos types.StreamPosition, err error) {
|
||||
presenceStatusID := types.PresenceToInt[presence]
|
||||
if fromSync {
|
||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
|
||||
err = stmt.QueryRowContext(ctx, userID, presenceStatusID, lastActiveTS).Scan(&pos)
|
||||
err = stmt.QueryRowContext(ctx, userID, presence, lastActiveTS).Scan(&pos)
|
||||
} else {
|
||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
|
||||
err = stmt.QueryRowContext(ctx, userID, presenceStatusID, statusMsg, lastActiveTS).Scan(&pos)
|
||||
err = stmt.QueryRowContext(ctx, userID, presence, statusMsg, lastActiveTS).Scan(&pos)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
@ -122,14 +121,13 @@ func (p *presenceStatements) UpsertPresence(
|
|||
func (p *presenceStatements) GetPresenceForUser(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
userID string,
|
||||
) (*types.Presence, error) {
|
||||
result := &types.Presence{
|
||||
) (*types.PresenceInternal, error) {
|
||||
result := &types.PresenceInternal{
|
||||
UserID: userID,
|
||||
}
|
||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||
var presenceStatusID int
|
||||
err := stmt.QueryRowContext(ctx, userID).Scan(&presenceStatusID, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
||||
result.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
||||
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
||||
result.ClientFields.Presence = result.Presence.String()
|
||||
return result, err
|
||||
}
|
||||
|
||||
|
|
@ -143,8 +141,8 @@ func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx)
|
|||
func (p *presenceStatements) GetPresenceAfter(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
after types.StreamPosition,
|
||||
) (presences map[string]*types.Presence, err error) {
|
||||
presences = make(map[string]*types.Presence)
|
||||
) (presences map[string]*types.PresenceInternal, err error) {
|
||||
presences = make(map[string]*types.PresenceInternal)
|
||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
|
||||
|
||||
rows, err := stmt.QueryContext(ctx, after)
|
||||
|
|
@ -152,14 +150,13 @@ func (p *presenceStatements) GetPresenceAfter(
|
|||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
|
||||
var presenceStatusID int
|
||||
for rows.Next() {
|
||||
presence := &types.Presence{}
|
||||
if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presenceStatusID, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil {
|
||||
qryRes := &types.PresenceInternal{}
|
||||
if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
presence.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
||||
presences[presence.UserID] = presence
|
||||
qryRes.ClientFields.Presence = qryRes.Presence.String()
|
||||
presences[qryRes.UserID] = qryRes
|
||||
}
|
||||
return presences, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1004,15 +1004,15 @@ func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID s
|
|||
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
|
||||
}
|
||||
|
||||
func (s *Database) UpdatePresence(ctx context.Context, userID, presence string, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||
func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||
return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
|
||||
}
|
||||
|
||||
func (s *Database) GetPresence(ctx context.Context, userID string) (*types.Presence, error) {
|
||||
func (s *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
||||
return s.Presence.GetPresenceForUser(ctx, nil, userID)
|
||||
}
|
||||
|
||||
func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.Presence, error) {
|
||||
func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) {
|
||||
return s.Presence.GetPresenceAfter(ctx, nil, after)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ func (p *presenceStatements) UpsertPresence(
|
|||
txn *sql.Tx,
|
||||
userID string,
|
||||
statusMsg *string,
|
||||
presence string,
|
||||
presence types.Presence,
|
||||
lastActiveTS gomatrixserverlib.Timestamp,
|
||||
fromSync bool,
|
||||
) (pos types.StreamPosition, err error) {
|
||||
|
|
@ -116,19 +116,18 @@ func (p *presenceStatements) UpsertPresence(
|
|||
return pos, err
|
||||
}
|
||||
|
||||
presenceStatusID := types.PresenceToInt[presence]
|
||||
if fromSync {
|
||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
|
||||
err = stmt.QueryRowContext(ctx,
|
||||
pos, userID, presenceStatusID,
|
||||
pos, userID, presence,
|
||||
lastActiveTS, pos,
|
||||
presenceStatusID, lastActiveTS).Scan(&pos)
|
||||
presence, lastActiveTS).Scan(&pos)
|
||||
} else {
|
||||
stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
|
||||
err = stmt.QueryRowContext(ctx,
|
||||
pos, userID, presenceStatusID,
|
||||
pos, userID, presence,
|
||||
statusMsg, lastActiveTS, pos,
|
||||
presenceStatusID, statusMsg, lastActiveTS).Scan(&pos)
|
||||
presence, statusMsg, lastActiveTS).Scan(&pos)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
@ -137,14 +136,13 @@ func (p *presenceStatements) UpsertPresence(
|
|||
func (p *presenceStatements) GetPresenceForUser(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
userID string,
|
||||
) (*types.Presence, error) {
|
||||
result := &types.Presence{
|
||||
) (*types.PresenceInternal, error) {
|
||||
result := &types.PresenceInternal{
|
||||
UserID: userID,
|
||||
}
|
||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||
var presenceStatusID int
|
||||
err := stmt.QueryRowContext(ctx, userID).Scan(&presenceStatusID, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
||||
result.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
||||
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
||||
result.ClientFields.Presence = result.Presence.String()
|
||||
return result, err
|
||||
}
|
||||
|
||||
|
|
@ -158,8 +156,8 @@ func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx)
|
|||
func (p *presenceStatements) GetPresenceAfter(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
after types.StreamPosition,
|
||||
) (presences map[string]*types.Presence, err error) {
|
||||
presences = make(map[string]*types.Presence)
|
||||
) (presences map[string]*types.PresenceInternal, err error) {
|
||||
presences = make(map[string]*types.PresenceInternal)
|
||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
|
||||
|
||||
rows, err := stmt.QueryContext(ctx, after)
|
||||
|
|
@ -167,14 +165,13 @@ func (p *presenceStatements) GetPresenceAfter(
|
|||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
|
||||
var presenceStatusID int
|
||||
for rows.Next() {
|
||||
presence := &types.Presence{}
|
||||
if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presenceStatusID, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil {
|
||||
qryRes := &types.PresenceInternal{}
|
||||
if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
presence.ClientFields.Presence = types.PresenceToString[presenceStatusID]
|
||||
presences[presence.UserID] = presence
|
||||
qryRes.ClientFields.Presence = qryRes.Presence.String()
|
||||
presences[qryRes.UserID] = qryRes
|
||||
}
|
||||
return presences, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -184,8 +184,8 @@ type NotificationData interface {
|
|||
}
|
||||
|
||||
type Presence interface {
|
||||
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
|
||||
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.Presence, err error)
|
||||
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
|
||||
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
|
||||
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
|
||||
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.Presence, err error)
|
||||
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.PresenceInternal, err error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
|||
}
|
||||
|
||||
// get all joined users
|
||||
// TODO: SharedUsers might get out of syncf
|
||||
// TODO: SharedUsers might get out of sync
|
||||
sharedUsers := p.notifier.SharedUsers(req.Device.UserID)
|
||||
|
||||
sharedUsersMap := map[string]bool{
|
||||
|
|
@ -116,7 +116,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
|||
pres, ok := p.cache.Load(cacheKey)
|
||||
if ok {
|
||||
// skip already sent presence
|
||||
prevPresence := pres.(*types.Presence)
|
||||
prevPresence := pres.(*types.PresenceInternal)
|
||||
currentlyActive := prevPresence.CurrentlyActive()
|
||||
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
|
||||
if skip {
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ type RequestPool struct {
|
|||
}
|
||||
|
||||
type PresencePublisher interface {
|
||||
SendPresence(userID, presence string, statusMsg *string) error
|
||||
SendPresence(userID string, presence types.Presence, statusMsg *string) error
|
||||
}
|
||||
|
||||
// NewRequestPool makes a new RequestPool
|
||||
|
|
@ -95,11 +95,14 @@ func (rp *RequestPool) cleanLastSeen() {
|
|||
}
|
||||
|
||||
func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Duration) {
|
||||
if !rp.cfg.Matrix.Presence.EnableOutbound {
|
||||
return
|
||||
}
|
||||
for {
|
||||
rp.presence.Range(func(key interface{}, v interface{}) bool {
|
||||
p := v.(types.Presence)
|
||||
p := v.(types.PresenceInternal)
|
||||
if time.Since(p.LastActiveTS.Time()) > cleanupTime {
|
||||
rp.updatePresence(db, "unavailable", p.UserID)
|
||||
rp.updatePresence(db, types.PresenceUnavailable.String(), p.UserID)
|
||||
rp.presence.Delete(key)
|
||||
}
|
||||
return true
|
||||
|
|
@ -110,17 +113,24 @@ func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Durat
|
|||
|
||||
// updatePresence sends presence updates to the SyncAPI and FederationAPI
|
||||
func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) {
|
||||
if rp.cfg.Matrix.DisablePresence {
|
||||
if !rp.cfg.Matrix.Presence.EnableOutbound {
|
||||
return
|
||||
}
|
||||
if presence == "" {
|
||||
presence = "online"
|
||||
presence = types.PresenceOnline.String()
|
||||
}
|
||||
|
||||
newPresence := types.Presence{
|
||||
presenceID, ok := types.PresenceFromString(presence)
|
||||
if !ok { // this should almost never happen
|
||||
logrus.Errorf("unknown presence '%s'", presence)
|
||||
return
|
||||
}
|
||||
|
||||
newPresence := types.PresenceInternal{
|
||||
ClientFields: types.PresenceClientResponse{
|
||||
Presence: presence,
|
||||
Presence: presenceID.String(),
|
||||
},
|
||||
Presence: presenceID,
|
||||
UserID: userID,
|
||||
LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
|
||||
}
|
||||
|
|
@ -128,7 +138,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|||
// avoid spamming presence updates when syncing
|
||||
existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
|
||||
if ok {
|
||||
p := existingPresence.(types.Presence)
|
||||
p := existingPresence.(types.PresenceInternal)
|
||||
if p.ClientFields.Presence == newPresence.ClientFields.Presence {
|
||||
return
|
||||
}
|
||||
|
|
@ -140,7 +150,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|||
return
|
||||
}
|
||||
|
||||
if err := rp.producer.SendPresence(userID, strings.ToLower(presence), dbPresence.ClientFields.StatusMsg); err != nil {
|
||||
if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil {
|
||||
logrus.WithError(err).Error("Unable to publish presence message from sync")
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,23 +15,23 @@ type dummyPublisher struct {
|
|||
count int
|
||||
}
|
||||
|
||||
func (d *dummyPublisher) SendPresence(userID, presence string, statusMsg *string) error {
|
||||
func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error {
|
||||
d.count++
|
||||
return nil
|
||||
}
|
||||
|
||||
type dummyDB struct{}
|
||||
|
||||
func (d dummyDB) UpdatePresence(ctx context.Context, userID, presence string, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||
func (d dummyDB) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (d dummyDB) GetPresence(ctx context.Context, userID string) (*types.Presence, error) {
|
||||
return &types.Presence{}, nil
|
||||
func (d dummyDB) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
||||
return &types.PresenceInternal{}, nil
|
||||
}
|
||||
|
||||
func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.Presence, error) {
|
||||
return map[string]*types.Presence{}, nil
|
||||
func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) {
|
||||
return map[string]*types.PresenceInternal{}, nil
|
||||
}
|
||||
|
||||
func (d dummyDB) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
|
||||
|
|
@ -106,7 +106,10 @@ func TestRequestPool_updatePresence(t *testing.T) {
|
|||
JetStream: config.JetStream{
|
||||
TopicPrefix: "Dendrite",
|
||||
},
|
||||
DisablePresence: false,
|
||||
Presence: config.PresenceOptions{
|
||||
EnableInbound: true,
|
||||
EnableOutbound: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
|||
75
syncapi/types/presence.go
Normal file
75
syncapi/types/presence.go
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
//go:generate stringer -type=Presence -linecomment
|
||||
type Presence uint8
|
||||
|
||||
const (
|
||||
PresenceUnavailable Presence = iota + 1 // unavailable
|
||||
PresenceOnline // online
|
||||
PresenceOffline // offline
|
||||
)
|
||||
|
||||
// PresenceFromString returns the integer representation of the given input presence.
|
||||
// Returns false for ok, if input is not a valid presence value.
|
||||
func PresenceFromString(input string) (p Presence, ok bool) {
|
||||
for i := 0; i < len(_Presence_index)-1; i++ {
|
||||
l, r := _Presence_index[i], _Presence_index[i+1]
|
||||
if strings.EqualFold(input, _Presence_name[l:r]) {
|
||||
return Presence(i + 1), true
|
||||
}
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
type PresenceInternal struct {
|
||||
ClientFields PresenceClientResponse
|
||||
StreamPos StreamPosition `json:"-"`
|
||||
UserID string `json:"-"`
|
||||
LastActiveTS gomatrixserverlib.Timestamp `json:"-"`
|
||||
Presence Presence `json:"-"`
|
||||
}
|
||||
|
||||
// Equals compares p1 with p2.
|
||||
func (p1 *PresenceInternal) Equals(p2 *PresenceInternal) bool {
|
||||
return p1.ClientFields.Presence == p2.ClientFields.Presence &&
|
||||
p1.ClientFields.StatusMsg == p2.ClientFields.StatusMsg &&
|
||||
p1.UserID == p2.UserID
|
||||
}
|
||||
|
||||
// CurrentlyActive returns the current active state.
|
||||
func (p *PresenceInternal) CurrentlyActive() bool {
|
||||
return time.Since(p.LastActiveTS.Time()).Minutes() < 5
|
||||
}
|
||||
|
||||
// LastActiveAgo returns the time since the LastActiveTS in milliseconds.
|
||||
func (p *PresenceInternal) LastActiveAgo() int64 {
|
||||
return time.Since(p.LastActiveTS.Time()).Milliseconds()
|
||||
}
|
||||
|
||||
type PresenceClientResponse struct {
|
||||
CurrentlyActive *bool `json:"currently_active,omitempty"`
|
||||
LastActiveAgo int64 `json:"last_active_ago,omitempty"`
|
||||
Presence string `json:"presence"`
|
||||
StatusMsg *string `json:"status_msg,omitempty"`
|
||||
}
|
||||
26
syncapi/types/presence_string.go
Normal file
26
syncapi/types/presence_string.go
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
// Code generated by "stringer -type=Presence -linecomment"; DO NOT EDIT.
|
||||
|
||||
package types
|
||||
|
||||
import "strconv"
|
||||
|
||||
func _() {
|
||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||
// Re-run the stringer command to generate them again.
|
||||
var x [1]struct{}
|
||||
_ = x[PresenceUnavailable-1]
|
||||
_ = x[PresenceOnline-2]
|
||||
_ = x[PresenceOffline-3]
|
||||
}
|
||||
|
||||
const _Presence_name = "unavailableonlineoffline"
|
||||
|
||||
var _Presence_index = [...]uint8{0, 11, 17, 24}
|
||||
|
||||
func (i Presence) String() string {
|
||||
i -= 1
|
||||
if i >= Presence(len(_Presence_index)-1) {
|
||||
return "Presence(" + strconv.FormatInt(int64(i+1), 10) + ")"
|
||||
}
|
||||
return _Presence_name[_Presence_index[i]:_Presence_index[i+1]]
|
||||
}
|
||||
42
syncapi/types/presence_test.go
Normal file
42
syncapi/types/presence_test.go
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
package types
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestPresenceFromString(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
wantStatus Presence
|
||||
wantOk bool
|
||||
}{
|
||||
{
|
||||
name: "presence unavailable",
|
||||
input: "unavailable",
|
||||
wantStatus: PresenceUnavailable,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "presence online",
|
||||
input: "OnLINE",
|
||||
wantStatus: PresenceOnline,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "unknown presence",
|
||||
input: "unknown",
|
||||
wantStatus: 0,
|
||||
wantOk: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, got1 := PresenceFromString(tt.input)
|
||||
if got != tt.wantStatus {
|
||||
t.Errorf("PresenceFromString() got = %v, want %v", got, tt.wantStatus)
|
||||
}
|
||||
if got1 != tt.wantOk {
|
||||
t.Errorf("PresenceFromString() got1 = %v, want %v", got1, tt.wantOk)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -20,7 +20,6 @@ import (
|
|||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -519,46 +518,3 @@ type OutputSendToDeviceEvent struct {
|
|||
DeviceID string `json:"device_id"`
|
||||
gomatrixserverlib.SendToDeviceEvent
|
||||
}
|
||||
|
||||
type Presence struct {
|
||||
ClientFields PresenceClientResponse
|
||||
StreamPos StreamPosition `json:"-"`
|
||||
UserID string `json:"-"`
|
||||
LastActiveTS gomatrixserverlib.Timestamp `json:"-"`
|
||||
}
|
||||
|
||||
// Equals compares p1 with p2.
|
||||
func (p1 *Presence) Equals(p2 *Presence) bool {
|
||||
return p1.ClientFields.Presence == p2.ClientFields.Presence &&
|
||||
p1.ClientFields.StatusMsg == p2.ClientFields.StatusMsg &&
|
||||
p1.UserID == p2.UserID
|
||||
}
|
||||
|
||||
// CurrentlyActive returns the current active state.
|
||||
func (p *Presence) CurrentlyActive() bool {
|
||||
return time.Since(p.LastActiveTS.Time()).Minutes() < 5
|
||||
}
|
||||
|
||||
// LastActiveAgo returns the time since the LastActiveTS in milliseconds.
|
||||
func (p *Presence) LastActiveAgo() int64 {
|
||||
return time.Since(p.LastActiveTS.Time()).Milliseconds()
|
||||
}
|
||||
|
||||
type PresenceClientResponse struct {
|
||||
CurrentlyActive *bool `json:"currently_active,omitempty"`
|
||||
LastActiveAgo int64 `json:"last_active_ago,omitempty"`
|
||||
Presence string `json:"presence"`
|
||||
StatusMsg *string `json:"status_msg,omitempty"`
|
||||
}
|
||||
|
||||
var PresenceToInt = map[string]int{
|
||||
"unavailable": 1,
|
||||
"online": 2,
|
||||
"offline": 3,
|
||||
}
|
||||
|
||||
var PresenceToString = map[int]string{
|
||||
1: "unavailable",
|
||||
2: "online",
|
||||
3: "offline",
|
||||
}
|
||||
|
|
|
|||
|
|
@ -661,6 +661,23 @@ Canonical alias can include alt_aliases
|
|||
Can delete canonical alias
|
||||
AS can make room aliases
|
||||
/context/ with lazy_load_members filter works
|
||||
/upgrade creates a new room
|
||||
/upgrade should preserve room visibility for public rooms
|
||||
/upgrade should preserve room visibility for private rooms
|
||||
/upgrade copies the power levels to the new room
|
||||
/upgrade preserves the power level of the upgrading user in old and new rooms
|
||||
/upgrade copies important state to the new room
|
||||
/upgrade copies ban events to the new room
|
||||
local user has push rules copied to upgraded room
|
||||
remote user has push rules copied to upgraded room
|
||||
/upgrade moves aliases to the new room
|
||||
/upgrade preserves room federation ability
|
||||
/upgrade restricts power levels in the old room
|
||||
/upgrade restricts power levels in the old room when the old PLs are unusual
|
||||
/upgrade to an unknown version is rejected
|
||||
/upgrade is rejected if the user can't send state events
|
||||
/upgrade of a bogus room fails gracefully
|
||||
Cannot send tombstone event that points to the same room
|
||||
Room summary counts change when membership changes
|
||||
GET /presence/:user_id/status fetches initial status
|
||||
PUT /presence/:user_id/status updates my presence
|
||||
|
|
@ -676,3 +693,6 @@ Presence changes are also reported to remote room members
|
|||
Presence changes to UNAVAILABLE are reported to local room members
|
||||
Presence changes to UNAVAILABLE are reported to remote room members
|
||||
New federated private chats get full presence information (SYN-115)
|
||||
/upgrade copies >100 power levels to the new room
|
||||
Room state after a rejected message event is the same as before
|
||||
Room state after a rejected state event is the same as before
|
||||
Loading…
Reference in a new issue