mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-23 06:41:56 -06:00
Merge branch 's7evink/consent-tracking' of github.com:matrix-org/dendrite into s7evink/consent-tracking
This commit is contained in:
commit
e42ef1706b
|
@ -88,7 +88,16 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
|
events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
|
||||||
events = append(events, output.NewRoomEvent.AddStateEvents...)
|
if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
|
||||||
|
eventsReq := &api.QueryEventsByIDRequest{
|
||||||
|
EventIDs: output.NewRoomEvent.AddsStateEventIDs,
|
||||||
|
}
|
||||||
|
eventsRes := &api.QueryEventsByIDResponse{}
|
||||||
|
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
events = append(events, eventsRes.Events...)
|
||||||
|
}
|
||||||
|
|
||||||
// Send event to any relevant application services
|
// Send event to any relevant application services
|
||||||
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
|
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
|
||||||
|
|
|
@ -65,6 +65,11 @@ func BadJSON(msg string) *MatrixError {
|
||||||
return &MatrixError{"M_BAD_JSON", msg}
|
return &MatrixError{"M_BAD_JSON", msg}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BadAlias is an error when the client supplies a bad alias.
|
||||||
|
func BadAlias(msg string) *MatrixError {
|
||||||
|
return &MatrixError{"M_BAD_ALIAS", msg}
|
||||||
|
}
|
||||||
|
|
||||||
// NotJSON is an error when the client supplies something that is not JSON
|
// NotJSON is an error when the client supplies something that is not JSON
|
||||||
// to a JSON endpoint.
|
// to a JSON endpoint.
|
||||||
func NotJSON(msg string) *MatrixError {
|
func NotJSON(msg string) *MatrixError {
|
||||||
|
|
|
@ -139,11 +139,17 @@ func SetLocalAlias(
|
||||||
// TODO: This code should eventually be refactored with:
|
// TODO: This code should eventually be refactored with:
|
||||||
// 1. The new method for checking for things matching an AS's namespace
|
// 1. The new method for checking for things matching an AS's namespace
|
||||||
// 2. Using an overall Regex object for all AS's just like we did for usernames
|
// 2. Using an overall Regex object for all AS's just like we did for usernames
|
||||||
|
reqUserID, _, err := gomatrixserverlib.SplitID('@', device.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.BadJSON("User ID must be in the form '@localpart:domain'"),
|
||||||
|
}
|
||||||
|
}
|
||||||
for _, appservice := range cfg.Derived.ApplicationServices {
|
for _, appservice := range cfg.Derived.ApplicationServices {
|
||||||
// Don't prevent AS from creating aliases in its own namespace
|
// Don't prevent AS from creating aliases in its own namespace
|
||||||
// Note that Dendrite uses SenderLocalpart as UserID for AS users
|
// Note that Dendrite uses SenderLocalpart as UserID for AS users
|
||||||
if device.UserID != appservice.SenderLocalpart {
|
if reqUserID != appservice.SenderLocalpart {
|
||||||
if aliasNamespaces, ok := appservice.NamespaceMap["aliases"]; ok {
|
if aliasNamespaces, ok := appservice.NamespaceMap["aliases"]; ok {
|
||||||
for _, namespace := range aliasNamespaces {
|
for _, namespace := range aliasNamespaces {
|
||||||
if namespace.Exclusive && namespace.RegexpObject.MatchString(alias) {
|
if namespace.Exclusive && namespace.RegexpObject.MatchString(alias) {
|
||||||
|
|
|
@ -16,6 +16,8 @@ package routing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -120,6 +122,40 @@ func SendEvent(
|
||||||
}
|
}
|
||||||
timeToGenerateEvent := time.Since(startedGeneratingEvent)
|
timeToGenerateEvent := time.Since(startedGeneratingEvent)
|
||||||
|
|
||||||
|
// validate that the aliases exists
|
||||||
|
if eventType == gomatrixserverlib.MRoomCanonicalAlias && stateKey != nil && *stateKey == "" {
|
||||||
|
aliasReq := api.AliasEvent{}
|
||||||
|
if err = json.Unmarshal(e.Content(), &aliasReq); err != nil {
|
||||||
|
return util.ErrorResponse(fmt.Errorf("unable to parse alias event: %w", err))
|
||||||
|
}
|
||||||
|
if !aliasReq.Valid() {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.InvalidParam("Request contains invalid aliases."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
aliasRes := &api.GetAliasesForRoomIDResponse{}
|
||||||
|
if err = rsAPI.GetAliasesForRoomID(req.Context(), &api.GetAliasesForRoomIDRequest{RoomID: roomID}, aliasRes); err != nil {
|
||||||
|
return jsonerror.InternalServerError()
|
||||||
|
}
|
||||||
|
var found int
|
||||||
|
requestAliases := append(aliasReq.AltAliases, aliasReq.Alias)
|
||||||
|
for _, alias := range aliasRes.Aliases {
|
||||||
|
for _, altAlias := range requestAliases {
|
||||||
|
if altAlias == alias {
|
||||||
|
found++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// check that we found at least the same amount of existing aliases as are in the request
|
||||||
|
if aliasReq.Alias != "" && found < len(requestAliases) {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.BadAlias("No matching alias found."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var txnAndSessionID *api.TransactionID
|
var txnAndSessionID *api.TransactionID
|
||||||
if txnID != nil {
|
if txnID != nil {
|
||||||
txnAndSessionID = &api.TransactionID{
|
txnAndSessionID = &api.TransactionID{
|
||||||
|
|
|
@ -146,7 +146,28 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
|
||||||
// processMessage updates the list of currently joined hosts in the room
|
// processMessage updates the list of currently joined hosts in the room
|
||||||
// and then sends the event to the hosts that were joined before the event.
|
// and then sends the event to the hosts that were joined before the event.
|
||||||
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
||||||
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState()))
|
eventsRes := &api.QueryEventsByIDResponse{}
|
||||||
|
if len(ore.AddsStateEventIDs) > 0 {
|
||||||
|
eventsReq := &api.QueryEventsByIDRequest{
|
||||||
|
EventIDs: ore.AddsStateEventIDs,
|
||||||
|
}
|
||||||
|
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
||||||
|
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
found := false
|
||||||
|
for _, event := range eventsRes.Events {
|
||||||
|
if event.EventID() == ore.Event.EventID() {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
eventsRes.Events = append(eventsRes.Events, ore.Event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
|
|
||||||
package api
|
package api
|
||||||
|
|
||||||
|
import "regexp"
|
||||||
|
|
||||||
// SetRoomAliasRequest is a request to SetRoomAlias
|
// SetRoomAliasRequest is a request to SetRoomAlias
|
||||||
type SetRoomAliasRequest struct {
|
type SetRoomAliasRequest struct {
|
||||||
// ID of the user setting the alias
|
// ID of the user setting the alias
|
||||||
|
@ -84,3 +86,20 @@ type RemoveRoomAliasResponse struct {
|
||||||
// Did we remove it?
|
// Did we remove it?
|
||||||
Removed bool `json:"removed"`
|
Removed bool `json:"removed"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type AliasEvent struct {
|
||||||
|
Alias string `json:"alias"`
|
||||||
|
AltAliases []string `json:"alt_aliases"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var validateAliasRegex = regexp.MustCompile("^#.*:.+$")
|
||||||
|
|
||||||
|
func (a AliasEvent) Valid() bool {
|
||||||
|
for _, alias := range a.AltAliases {
|
||||||
|
if !validateAliasRegex.MatchString(alias) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return a.Alias == "" || validateAliasRegex.MatchString(a.Alias)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
62
roomserver/api/alias_test.go
Normal file
62
roomserver/api/alias_test.go
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestAliasEvent_Valid(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
Alias string
|
||||||
|
AltAliases []string
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty alias",
|
||||||
|
fields: fields{
|
||||||
|
Alias: "",
|
||||||
|
},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty alias, invalid alt aliases",
|
||||||
|
fields: fields{
|
||||||
|
Alias: "",
|
||||||
|
AltAliases: []string{ "%not:valid.local"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid alias, invalid alt aliases",
|
||||||
|
fields: fields{
|
||||||
|
Alias: "#valid:test.local",
|
||||||
|
AltAliases: []string{ "%not:valid.local"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty alias, invalid alt aliases",
|
||||||
|
fields: fields{
|
||||||
|
Alias: "",
|
||||||
|
AltAliases: []string{ "%not:valid.local"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid alias",
|
||||||
|
fields: fields{
|
||||||
|
Alias: "%not:valid.local",
|
||||||
|
AltAliases: []string{ },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
a := AliasEvent{
|
||||||
|
Alias: tt.fields.Alias,
|
||||||
|
AltAliases: tt.fields.AltAliases,
|
||||||
|
}
|
||||||
|
if got := a.Valid(); got != tt.want {
|
||||||
|
t.Errorf("Valid() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -105,7 +105,7 @@ type OutputNewRoomEvent struct {
|
||||||
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
|
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
|
||||||
// Does the event completely rewrite the room state? If so, then AddsStateEventIDs
|
// Does the event completely rewrite the room state? If so, then AddsStateEventIDs
|
||||||
// will contain the entire room state.
|
// will contain the entire room state.
|
||||||
RewritesState bool `json:"rewrites_state"`
|
RewritesState bool `json:"rewrites_state,omitempty"`
|
||||||
// The latest events in the room after this event.
|
// The latest events in the room after this event.
|
||||||
// This can be used to set the prev events for new events in the room.
|
// This can be used to set the prev events for new events in the room.
|
||||||
// This also can be used to get the full current state after this event.
|
// This also can be used to get the full current state after this event.
|
||||||
|
@ -113,16 +113,9 @@ type OutputNewRoomEvent struct {
|
||||||
// The state event IDs that were added to the state of the room by this event.
|
// The state event IDs that were added to the state of the room by this event.
|
||||||
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date
|
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date
|
||||||
// view of the current state of the room.
|
// view of the current state of the room.
|
||||||
AddsStateEventIDs []string `json:"adds_state_event_ids"`
|
AddsStateEventIDs []string `json:"adds_state_event_ids,omitempty"`
|
||||||
// All extra newly added state events. This is only set if there are *extra* events
|
|
||||||
// other than `Event`. This can happen when forks get merged because state resolution
|
|
||||||
// may decide a bunch of state events on one branch are now valid, so they will be
|
|
||||||
// present in this list. This is useful when trying to maintain the current state of a room
|
|
||||||
// as to do so you need to include both these events and `Event`.
|
|
||||||
AddStateEvents []*gomatrixserverlib.HeaderedEvent `json:"adds_state_events"`
|
|
||||||
|
|
||||||
// The state event IDs that were removed from the state of the room by this event.
|
// The state event IDs that were removed from the state of the room by this event.
|
||||||
RemovesStateEventIDs []string `json:"removes_state_event_ids"`
|
RemovesStateEventIDs []string `json:"removes_state_event_ids,omitempty"`
|
||||||
// The ID of the event that was output before this event.
|
// The ID of the event that was output before this event.
|
||||||
// Or the empty string if this is the first event output for this room.
|
// Or the empty string if this is the first event output for this room.
|
||||||
// This is used by consumers to check if they can safely update their
|
// This is used by consumers to check if they can safely update their
|
||||||
|
@ -145,10 +138,10 @@ type OutputNewRoomEvent struct {
|
||||||
//
|
//
|
||||||
// The state is given as a delta against the current state because they are
|
// The state is given as a delta against the current state because they are
|
||||||
// usually either the same state, or differ by just a couple of events.
|
// usually either the same state, or differ by just a couple of events.
|
||||||
StateBeforeAddsEventIDs []string `json:"state_before_adds_event_ids"`
|
StateBeforeAddsEventIDs []string `json:"state_before_adds_event_ids,omitempty"`
|
||||||
// The state event IDs that are part of the current state, but not part
|
// The state event IDs that are part of the current state, but not part
|
||||||
// of the state at the event.
|
// of the state at the event.
|
||||||
StateBeforeRemovesEventIDs []string `json:"state_before_removes_event_ids"`
|
StateBeforeRemovesEventIDs []string `json:"state_before_removes_event_ids,omitempty"`
|
||||||
// The server name to use to push this event to other servers.
|
// The server name to use to push this event to other servers.
|
||||||
// Or empty if this event shouldn't be pushed to other servers.
|
// Or empty if this event shouldn't be pushed to other servers.
|
||||||
//
|
//
|
||||||
|
@ -167,27 +160,7 @@ type OutputNewRoomEvent struct {
|
||||||
SendAsServer string `json:"send_as_server"`
|
SendAsServer string `json:"send_as_server"`
|
||||||
// The transaction ID of the send request if sent by a local user and one
|
// The transaction ID of the send request if sent by a local user and one
|
||||||
// was specified
|
// was specified
|
||||||
TransactionID *TransactionID `json:"transaction_id"`
|
TransactionID *TransactionID `json:"transaction_id,omitempty"`
|
||||||
}
|
|
||||||
|
|
||||||
// AddsState returns all added state events from this event.
|
|
||||||
//
|
|
||||||
// This function is needed because `AddStateEvents` will not include a copy of
|
|
||||||
// the original event to save space, so you cannot use that slice alone.
|
|
||||||
// Instead, use this function which will add the original event if it is present
|
|
||||||
// in `AddsStateEventIDs`.
|
|
||||||
func (ore *OutputNewRoomEvent) AddsState() []*gomatrixserverlib.HeaderedEvent {
|
|
||||||
includeOutputEvent := false
|
|
||||||
for _, id := range ore.AddsStateEventIDs {
|
|
||||||
if id == ore.Event.EventID() {
|
|
||||||
includeOutputEvent = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !includeOutputEvent {
|
|
||||||
return ore.AddStateEvents
|
|
||||||
}
|
|
||||||
return append(ore.AddStateEvents, ore.Event)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// An OutputOldRoomEvent is written when the roomserver receives an old event.
|
// An OutputOldRoomEvent is written when the roomserver receives an old event.
|
||||||
|
|
|
@ -16,12 +16,18 @@ package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
|
|
||||||
asAPI "github.com/matrix-org/dendrite/appservice/api"
|
asAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
"github.com/tidwall/sjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RoomserverInternalAPIDatabase has the storage APIs needed to implement the alias API.
|
// RoomserverInternalAPIDatabase has the storage APIs needed to implement the alias API.
|
||||||
|
@ -183,6 +189,57 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ev, err := r.DB.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomCanonicalAlias, "")
|
||||||
|
if err != nil && err != sql.ErrNoRows {
|
||||||
|
return err
|
||||||
|
} else if ev != nil {
|
||||||
|
stateAlias := gjson.GetBytes(ev.Content(), "alias").Str
|
||||||
|
// the alias to remove is currently set as the canonical alias, remove it
|
||||||
|
if stateAlias == request.Alias {
|
||||||
|
res, err := sjson.DeleteBytes(ev.Content(), "alias")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
sender := request.UserID
|
||||||
|
if request.UserID != ev.Sender() {
|
||||||
|
sender = ev.Sender()
|
||||||
|
}
|
||||||
|
|
||||||
|
builder := &gomatrixserverlib.EventBuilder{
|
||||||
|
Sender: sender,
|
||||||
|
RoomID: ev.RoomID(),
|
||||||
|
Type: ev.Type(),
|
||||||
|
StateKey: ev.StateKey(),
|
||||||
|
Content: res,
|
||||||
|
}
|
||||||
|
|
||||||
|
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err)
|
||||||
|
}
|
||||||
|
if len(eventsNeeded.Tuples()) == 0 {
|
||||||
|
return errors.New("expecting state tuples for event builder, got none")
|
||||||
|
}
|
||||||
|
|
||||||
|
stateRes := &api.QueryLatestEventsAndStateResponse{}
|
||||||
|
if err := helpers.QueryLatestEventsAndState(ctx, r.DB, &api.QueryLatestEventsAndStateRequest{RoomID: roomID, StateToFetch: eventsNeeded.Tuples()}, stateRes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newEvent, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, time.Now(), &eventsNeeded, stateRes)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = api.SendEvents(ctx, r.RSAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Remove the alias from the database
|
// Remove the alias from the database
|
||||||
if err := r.DB.RemoveRoomAlias(ctx, request.Alias); err != nil {
|
if err := r.DB.RemoveRoomAlias(ctx, request.Alias); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -365,6 +365,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
||||||
LastSentEventID: u.lastEventIDSent,
|
LastSentEventID: u.lastEventIDSent,
|
||||||
LatestEventIDs: latestEventIDs,
|
LatestEventIDs: latestEventIDs,
|
||||||
TransactionID: u.transactionID,
|
TransactionID: u.transactionID,
|
||||||
|
SendAsServer: u.sendAsServer,
|
||||||
}
|
}
|
||||||
|
|
||||||
eventIDMap, err := u.stateEventMap()
|
eventIDMap, err := u.stateEventMap()
|
||||||
|
@ -384,51 +385,17 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
||||||
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
|
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
|
||||||
}
|
}
|
||||||
|
|
||||||
ore.SendAsServer = u.sendAsServer
|
|
||||||
|
|
||||||
// include extra state events if they were added as nearly every downstream component will care about it
|
|
||||||
// and we'd rather not have them all hit QueryEventsByID at the same time!
|
|
||||||
if len(ore.AddsStateEventIDs) > 0 {
|
|
||||||
var err error
|
|
||||||
if ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &api.OutputEvent{
|
return &api.OutputEvent{
|
||||||
Type: api.OutputTypeNewRoomEvent,
|
Type: api.OutputTypeNewRoomEvent,
|
||||||
NewRoomEvent: &ore,
|
NewRoomEvent: &ore,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// extraEventsForIDs returns the full events for the event IDs given, but does not include the current event being
|
|
||||||
// updated.
|
|
||||||
func (u *latestEventsUpdater) extraEventsForIDs(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
|
||||||
var extraEventIDs []string
|
|
||||||
for _, e := range eventIDs {
|
|
||||||
if e == u.event.EventID() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
extraEventIDs = append(extraEventIDs, e)
|
|
||||||
}
|
|
||||||
if len(extraEventIDs) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
extraEvents, err := u.updater.UnsentEventsFromIDs(u.ctx, extraEventIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var h []*gomatrixserverlib.HeaderedEvent
|
|
||||||
for _, e := range extraEvents {
|
|
||||||
h = append(h, e.Headered(roomVersion))
|
|
||||||
}
|
|
||||||
return h, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// retrieve an event nid -> event ID map for all events that need updating
|
// retrieve an event nid -> event ID map for all events that need updating
|
||||||
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
|
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
|
||||||
var stateEventNIDs []types.EventNID
|
cap := len(u.added) + len(u.removed) + len(u.stateBeforeEventRemoves) + len(u.stateBeforeEventAdds)
|
||||||
var allStateEntries []types.StateEntry
|
stateEventNIDs := make(types.EventNIDs, 0, cap)
|
||||||
|
allStateEntries := make([]types.StateEntry, 0, cap)
|
||||||
allStateEntries = append(allStateEntries, u.added...)
|
allStateEntries = append(allStateEntries, u.added...)
|
||||||
allStateEntries = append(allStateEntries, u.removed...)
|
allStateEntries = append(allStateEntries, u.removed...)
|
||||||
allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...)
|
allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...)
|
||||||
|
@ -436,12 +403,6 @@ func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error)
|
||||||
for _, entry := range allStateEntries {
|
for _, entry := range allStateEntries {
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||||
}
|
}
|
||||||
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
|
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(stateEventNIDs)]
|
||||||
return u.updater.EventIDs(u.ctx, stateEventNIDs)
|
return u.updater.EventIDs(u.ctx, stateEventNIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventNIDSorter []types.EventNID
|
|
||||||
|
|
||||||
func (s eventNIDSorter) Len() int { return len(s) }
|
|
||||||
func (s eventNIDSorter) Less(i, j int) bool { return s[i] < s[j] }
|
|
||||||
func (s eventNIDSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/getsentry/sentry-go"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
@ -29,6 +30,7 @@ func JetStreamConsumer(
|
||||||
name := durable + "Pull"
|
name := durable + "Pull"
|
||||||
sub, err := js.PullSubscribe(subj, name, opts...)
|
sub, err := js.PullSubscribe(subj, name, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
sentry.CaptureException(err)
|
||||||
return fmt.Errorf("nats.SubscribeSync: %w", err)
|
return fmt.Errorf("nats.SubscribeSync: %w", err)
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -55,6 +57,7 @@ func JetStreamConsumer(
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Something else went wrong, so we'll panic.
|
// Something else went wrong, so we'll panic.
|
||||||
|
sentry.CaptureException(err)
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Fatal(err)
|
logrus.WithContext(ctx).WithField("subject", subj).Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -64,15 +67,18 @@ func JetStreamConsumer(
|
||||||
msg := msgs[0]
|
msg := msgs[0]
|
||||||
if err = msg.InProgress(); err != nil {
|
if err = msg.InProgress(); err != nil {
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
|
||||||
|
sentry.CaptureException(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if f(ctx, msg) {
|
if f(ctx, msg) {
|
||||||
if err = msg.Ack(); err != nil {
|
if err = msg.Ack(); err != nil {
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Ack: %w", err))
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Ack: %w", err))
|
||||||
|
sentry.CaptureException(err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err = msg.Nak(); err != nil {
|
if err = msg.Nak(); err != nil {
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
|
||||||
|
sentry.CaptureException(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -199,13 +199,14 @@ func (w *walker) storePaginationCache(paginationToken string, cache paginationIn
|
||||||
}
|
}
|
||||||
|
|
||||||
type roomVisit struct {
|
type roomVisit struct {
|
||||||
roomID string
|
roomID string
|
||||||
depth int
|
parentRoomID string
|
||||||
vias []string // vias to query this room by
|
depth int
|
||||||
|
vias []string // vias to query this room by
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *walker) walk() util.JSONResponse {
|
func (w *walker) walk() util.JSONResponse {
|
||||||
if !w.authorised(w.rootRoomID) {
|
if authorised, _ := w.authorised(w.rootRoomID, ""); !authorised {
|
||||||
if w.caller != nil {
|
if w.caller != nil {
|
||||||
// CS API format
|
// CS API format
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
@ -238,8 +239,9 @@ func (w *walker) walk() util.JSONResponse {
|
||||||
w.paginationToken = tok
|
w.paginationToken = tok
|
||||||
// Begin walking the graph starting with the room ID in the request in a queue of unvisited rooms
|
// Begin walking the graph starting with the room ID in the request in a queue of unvisited rooms
|
||||||
c.unvisited = append(c.unvisited, roomVisit{
|
c.unvisited = append(c.unvisited, roomVisit{
|
||||||
roomID: w.rootRoomID,
|
roomID: w.rootRoomID,
|
||||||
depth: 0,
|
parentRoomID: "",
|
||||||
|
depth: 0,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -277,23 +279,8 @@ func (w *walker) walk() util.JSONResponse {
|
||||||
|
|
||||||
// If we know about this room and the caller is authorised (joined/world_readable) then pull
|
// If we know about this room and the caller is authorised (joined/world_readable) then pull
|
||||||
// events locally
|
// events locally
|
||||||
if w.roomExists(rv.roomID) && w.authorised(rv.roomID) {
|
roomExists := w.roomExists(rv.roomID)
|
||||||
// Get all `m.space.child` state events for this room
|
if !roomExists {
|
||||||
events, err := w.childReferences(rv.roomID)
|
|
||||||
if err != nil {
|
|
||||||
util.GetLogger(w.ctx).WithError(err).WithField("room_id", rv.roomID).Error("failed to extract references for room")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
discoveredChildEvents = events
|
|
||||||
|
|
||||||
pubRoom := w.publicRoomsChunk(rv.roomID)
|
|
||||||
|
|
||||||
discoveredRooms = append(discoveredRooms, gomatrixserverlib.MSC2946Room{
|
|
||||||
PublicRoom: *pubRoom,
|
|
||||||
RoomType: roomType,
|
|
||||||
ChildrenState: events,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
// attempt to query this room over federation, as either we've never heard of it before
|
// attempt to query this room over federation, as either we've never heard of it before
|
||||||
// or we've left it and hence are not authorised (but info may be exposed regardless)
|
// or we've left it and hence are not authorised (but info may be exposed regardless)
|
||||||
fedRes, err := w.federatedRoomInfo(rv.roomID, rv.vias)
|
fedRes, err := w.federatedRoomInfo(rv.roomID, rv.vias)
|
||||||
|
@ -312,6 +299,29 @@ func (w *walker) walk() util.JSONResponse {
|
||||||
// as these children may be rooms we do know about.
|
// as these children may be rooms we do know about.
|
||||||
roomType = ConstCreateEventContentValueSpace
|
roomType = ConstCreateEventContentValueSpace
|
||||||
}
|
}
|
||||||
|
} else if authorised, isJoinedOrInvited := w.authorised(rv.roomID, rv.parentRoomID); authorised {
|
||||||
|
// Get all `m.space.child` state events for this room
|
||||||
|
events, err := w.childReferences(rv.roomID)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(w.ctx).WithError(err).WithField("room_id", rv.roomID).Error("failed to extract references for room")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
discoveredChildEvents = events
|
||||||
|
|
||||||
|
pubRoom := w.publicRoomsChunk(rv.roomID)
|
||||||
|
|
||||||
|
discoveredRooms = append(discoveredRooms, gomatrixserverlib.MSC2946Room{
|
||||||
|
PublicRoom: *pubRoom,
|
||||||
|
RoomType: roomType,
|
||||||
|
ChildrenState: events,
|
||||||
|
})
|
||||||
|
// don't walk children if the user is not joined/invited to the space
|
||||||
|
if !isJoinedOrInvited {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// room exists but user is not authorised
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// don't walk the children
|
// don't walk the children
|
||||||
|
@ -332,9 +342,10 @@ func (w *walker) walk() util.JSONResponse {
|
||||||
ev := discoveredChildEvents[i]
|
ev := discoveredChildEvents[i]
|
||||||
_ = json.Unmarshal(ev.Content, &spaceContent)
|
_ = json.Unmarshal(ev.Content, &spaceContent)
|
||||||
unvisited = append(unvisited, roomVisit{
|
unvisited = append(unvisited, roomVisit{
|
||||||
roomID: ev.StateKey,
|
roomID: ev.StateKey,
|
||||||
depth: rv.depth + 1,
|
parentRoomID: rv.roomID,
|
||||||
vias: spaceContent.Via,
|
depth: rv.depth + 1,
|
||||||
|
vias: spaceContent.Via,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -465,25 +476,29 @@ func (w *walker) roomExists(roomID string) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// authorised returns true iff the user is joined this room or the room is world_readable
|
// authorised returns true iff the user is joined this room or the room is world_readable
|
||||||
func (w *walker) authorised(roomID string) bool {
|
func (w *walker) authorised(roomID, parentRoomID string) (authed, isJoinedOrInvited bool) {
|
||||||
if w.caller != nil {
|
if w.caller != nil {
|
||||||
return w.authorisedUser(roomID)
|
return w.authorisedUser(roomID, parentRoomID)
|
||||||
}
|
}
|
||||||
return w.authorisedServer(roomID)
|
return w.authorisedServer(roomID), false
|
||||||
}
|
}
|
||||||
|
|
||||||
// authorisedServer returns true iff the server is joined this room or the room is world_readable
|
// authorisedServer returns true iff the server is joined this room or the room is world_readable
|
||||||
func (w *walker) authorisedServer(roomID string) bool {
|
func (w *walker) authorisedServer(roomID string) bool {
|
||||||
// Check history visibility first
|
// Check history visibility / join rules first
|
||||||
hisVisTuple := gomatrixserverlib.StateKeyTuple{
|
hisVisTuple := gomatrixserverlib.StateKeyTuple{
|
||||||
EventType: gomatrixserverlib.MRoomHistoryVisibility,
|
EventType: gomatrixserverlib.MRoomHistoryVisibility,
|
||||||
StateKey: "",
|
StateKey: "",
|
||||||
}
|
}
|
||||||
|
joinRuleTuple := gomatrixserverlib.StateKeyTuple{
|
||||||
|
EventType: gomatrixserverlib.MRoomJoinRules,
|
||||||
|
StateKey: "",
|
||||||
|
}
|
||||||
var queryRoomRes roomserver.QueryCurrentStateResponse
|
var queryRoomRes roomserver.QueryCurrentStateResponse
|
||||||
err := w.rsAPI.QueryCurrentState(w.ctx, &roomserver.QueryCurrentStateRequest{
|
err := w.rsAPI.QueryCurrentState(w.ctx, &roomserver.QueryCurrentStateRequest{
|
||||||
RoomID: roomID,
|
RoomID: roomID,
|
||||||
StateTuples: []gomatrixserverlib.StateKeyTuple{
|
StateTuples: []gomatrixserverlib.StateKeyTuple{
|
||||||
hisVisTuple,
|
hisVisTuple, joinRuleTuple,
|
||||||
},
|
},
|
||||||
}, &queryRoomRes)
|
}, &queryRoomRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -497,29 +512,46 @@ func (w *walker) authorisedServer(roomID string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// check if server is joined to the room
|
|
||||||
var queryRes fs.QueryJoinedHostServerNamesInRoomResponse
|
// check if this room is a restricted room and if so, we need to check if the server is joined to an allowed room ID
|
||||||
err = w.fsAPI.QueryJoinedHostServerNamesInRoom(w.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
|
// in addition to the actual room ID (but always do the actual one first as it's quicker in the common case)
|
||||||
RoomID: roomID,
|
allowJoinedToRoomIDs := []string{roomID}
|
||||||
}, &queryRes)
|
joinRuleEv := queryRoomRes.StateEvents[joinRuleTuple]
|
||||||
if err != nil {
|
if joinRuleEv != nil {
|
||||||
util.GetLogger(w.ctx).WithError(err).Error("failed to QueryJoinedHostServerNamesInRoom")
|
allowJoinedToRoomIDs = append(allowJoinedToRoomIDs, w.restrictedJoinRuleAllowedRooms(joinRuleEv, "m.room_membership")...)
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
for _, srv := range queryRes.ServerNames {
|
|
||||||
if srv == w.serverName {
|
// check if server is joined to any allowed room
|
||||||
return true
|
for _, allowedRoomID := range allowJoinedToRoomIDs {
|
||||||
|
var queryRes fs.QueryJoinedHostServerNamesInRoomResponse
|
||||||
|
err = w.fsAPI.QueryJoinedHostServerNamesInRoom(w.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
|
||||||
|
RoomID: allowedRoomID,
|
||||||
|
}, &queryRes)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(w.ctx).WithError(err).Error("failed to QueryJoinedHostServerNamesInRoom")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, srv := range queryRes.ServerNames {
|
||||||
|
if srv == w.serverName {
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// authorisedUser returns true iff the user is joined this room or the room is world_readable
|
// authorisedUser returns true iff the user is invited/joined this room or the room is world_readable.
|
||||||
func (w *walker) authorisedUser(roomID string) bool {
|
// Failing that, if the room has a restricted join rule and belongs to the space parent listed, it will return true.
|
||||||
|
func (w *walker) authorisedUser(roomID, parentRoomID string) (authed bool, isJoinedOrInvited bool) {
|
||||||
hisVisTuple := gomatrixserverlib.StateKeyTuple{
|
hisVisTuple := gomatrixserverlib.StateKeyTuple{
|
||||||
EventType: gomatrixserverlib.MRoomHistoryVisibility,
|
EventType: gomatrixserverlib.MRoomHistoryVisibility,
|
||||||
StateKey: "",
|
StateKey: "",
|
||||||
}
|
}
|
||||||
|
joinRuleTuple := gomatrixserverlib.StateKeyTuple{
|
||||||
|
EventType: gomatrixserverlib.MRoomJoinRules,
|
||||||
|
StateKey: "",
|
||||||
|
}
|
||||||
roomMemberTuple := gomatrixserverlib.StateKeyTuple{
|
roomMemberTuple := gomatrixserverlib.StateKeyTuple{
|
||||||
EventType: gomatrixserverlib.MRoomMember,
|
EventType: gomatrixserverlib.MRoomMember,
|
||||||
StateKey: w.caller.UserID,
|
StateKey: w.caller.UserID,
|
||||||
|
@ -528,28 +560,79 @@ func (w *walker) authorisedUser(roomID string) bool {
|
||||||
err := w.rsAPI.QueryCurrentState(w.ctx, &roomserver.QueryCurrentStateRequest{
|
err := w.rsAPI.QueryCurrentState(w.ctx, &roomserver.QueryCurrentStateRequest{
|
||||||
RoomID: roomID,
|
RoomID: roomID,
|
||||||
StateTuples: []gomatrixserverlib.StateKeyTuple{
|
StateTuples: []gomatrixserverlib.StateKeyTuple{
|
||||||
hisVisTuple, roomMemberTuple,
|
hisVisTuple, joinRuleTuple, roomMemberTuple,
|
||||||
},
|
},
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(w.ctx).WithError(err).Error("failed to QueryCurrentState")
|
util.GetLogger(w.ctx).WithError(err).Error("failed to QueryCurrentState")
|
||||||
return false
|
return false, false
|
||||||
}
|
}
|
||||||
memberEv := queryRes.StateEvents[roomMemberTuple]
|
memberEv := queryRes.StateEvents[roomMemberTuple]
|
||||||
hisVisEv := queryRes.StateEvents[hisVisTuple]
|
|
||||||
if memberEv != nil {
|
if memberEv != nil {
|
||||||
membership, _ := memberEv.Membership()
|
membership, _ := memberEv.Membership()
|
||||||
if membership == gomatrixserverlib.Join || membership == gomatrixserverlib.Invite {
|
if membership == gomatrixserverlib.Join || membership == gomatrixserverlib.Invite {
|
||||||
return true
|
return true, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
hisVisEv := queryRes.StateEvents[hisVisTuple]
|
||||||
if hisVisEv != nil {
|
if hisVisEv != nil {
|
||||||
hisVis, _ := hisVisEv.HistoryVisibility()
|
hisVis, _ := hisVisEv.HistoryVisibility()
|
||||||
if hisVis == "world_readable" {
|
if hisVis == "world_readable" {
|
||||||
return true
|
return true, false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false
|
joinRuleEv := queryRes.StateEvents[joinRuleTuple]
|
||||||
|
if parentRoomID != "" && joinRuleEv != nil {
|
||||||
|
allowedRoomIDs := w.restrictedJoinRuleAllowedRooms(joinRuleEv, "m.room_membership")
|
||||||
|
// check parent is in the allowed set
|
||||||
|
var allowed bool
|
||||||
|
for _, a := range allowedRoomIDs {
|
||||||
|
if parentRoomID == a {
|
||||||
|
allowed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if allowed {
|
||||||
|
// ensure caller is joined to the parent room
|
||||||
|
var queryRes2 roomserver.QueryCurrentStateResponse
|
||||||
|
err = w.rsAPI.QueryCurrentState(w.ctx, &roomserver.QueryCurrentStateRequest{
|
||||||
|
RoomID: parentRoomID,
|
||||||
|
StateTuples: []gomatrixserverlib.StateKeyTuple{
|
||||||
|
roomMemberTuple,
|
||||||
|
},
|
||||||
|
}, &queryRes2)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(w.ctx).WithError(err).WithField("parent_room_id", parentRoomID).Warn("failed to check user is joined to parent room")
|
||||||
|
} else {
|
||||||
|
memberEv = queryRes2.StateEvents[roomMemberTuple]
|
||||||
|
if memberEv != nil {
|
||||||
|
membership, _ := memberEv.Membership()
|
||||||
|
if membership == gomatrixserverlib.Join {
|
||||||
|
return true, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *walker) restrictedJoinRuleAllowedRooms(joinRuleEv *gomatrixserverlib.HeaderedEvent, allowType string) (allows []string) {
|
||||||
|
rule, _ := joinRuleEv.JoinRule()
|
||||||
|
if rule != "restricted" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var jrContent gomatrixserverlib.JoinRuleContent
|
||||||
|
if err := json.Unmarshal(joinRuleEv.Content(), &jrContent); err != nil {
|
||||||
|
util.GetLogger(w.ctx).Warnf("failed to check join_rule on room %s: %s", joinRuleEv.RoomID(), err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for _, allow := range jrContent.Allow {
|
||||||
|
if allow.Type == allowType {
|
||||||
|
allows = append(allows, allow.RoomID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// references returns all child references pointing to or from this room.
|
// references returns all child references pointing to or from this room.
|
||||||
|
|
|
@ -154,7 +154,42 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
ctx context.Context, msg api.OutputNewRoomEvent,
|
ctx context.Context, msg api.OutputNewRoomEvent,
|
||||||
) error {
|
) error {
|
||||||
ev := msg.Event
|
ev := msg.Event
|
||||||
addsStateEvents := msg.AddsState()
|
|
||||||
|
addsStateEvents := []*gomatrixserverlib.HeaderedEvent{}
|
||||||
|
foundEventIDs := map[string]bool{}
|
||||||
|
if len(msg.AddsStateEventIDs) > 0 {
|
||||||
|
for _, eventID := range msg.AddsStateEventIDs {
|
||||||
|
foundEventIDs[eventID] = false
|
||||||
|
}
|
||||||
|
foundEvents, err := s.db.Events(ctx, msg.AddsStateEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("s.db.Events: %w", err)
|
||||||
|
}
|
||||||
|
for _, event := range foundEvents {
|
||||||
|
foundEventIDs[event.EventID()] = true
|
||||||
|
}
|
||||||
|
eventsReq := &api.QueryEventsByIDRequest{}
|
||||||
|
eventsRes := &api.QueryEventsByIDResponse{}
|
||||||
|
for eventID, found := range foundEventIDs {
|
||||||
|
if !found {
|
||||||
|
eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err = s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
|
||||||
|
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
|
||||||
|
}
|
||||||
|
for _, event := range eventsRes.Events {
|
||||||
|
eventID := event.EventID()
|
||||||
|
foundEvents = append(foundEvents, event)
|
||||||
|
foundEventIDs[eventID] = true
|
||||||
|
}
|
||||||
|
for eventID, found := range foundEventIDs {
|
||||||
|
if !found {
|
||||||
|
return fmt.Errorf("event %s is missing", eventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addsStateEvents = foundEvents
|
||||||
|
}
|
||||||
|
|
||||||
ev, err := s.updateStateEvent(ev)
|
ev, err := s.updateStateEvent(ev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -24,6 +24,7 @@ Local device key changes get to remote servers with correct prev_id
|
||||||
|
|
||||||
# Flakey
|
# Flakey
|
||||||
Local device key changes appear in /keys/changes
|
Local device key changes appear in /keys/changes
|
||||||
|
/context/ with lazy_load_members filter works
|
||||||
|
|
||||||
# we don't support groups
|
# we don't support groups
|
||||||
Remove group category
|
Remove group category
|
||||||
|
@ -31,8 +32,11 @@ Remove group role
|
||||||
|
|
||||||
# Flakey
|
# Flakey
|
||||||
AS-ghosted users can use rooms themselves
|
AS-ghosted users can use rooms themselves
|
||||||
|
/context/ with lazy_load_members filter works
|
||||||
|
AS-ghosted users can use rooms via AS
|
||||||
|
Events in rooms with AS-hosted room aliases are sent to AS server
|
||||||
|
|
||||||
# Flakey, need additional investigation
|
# Flakey, need additional investigation
|
||||||
Messages that notify from another user increment notification_count
|
Messages that notify from another user increment notification_count
|
||||||
Messages that highlight from another user increment unread highlight count
|
Messages that highlight from another user increment unread highlight count
|
||||||
Notifications can be viewed with GET /notifications
|
Notifications can be viewed with GET /notifications
|
||||||
|
|
|
@ -648,7 +648,6 @@ Device list doesn't change if remote server is down
|
||||||
/context/ on joined room works
|
/context/ on joined room works
|
||||||
/context/ on non world readable room does not work
|
/context/ on non world readable room does not work
|
||||||
/context/ returns correct number of events
|
/context/ returns correct number of events
|
||||||
/context/ with lazy_load_members filter works
|
|
||||||
GET /rooms/:room_id/messages lazy loads members correctly
|
GET /rooms/:room_id/messages lazy loads members correctly
|
||||||
Can query remote device keys using POST after notification
|
Can query remote device keys using POST after notification
|
||||||
Device deletion propagates over federation
|
Device deletion propagates over federation
|
||||||
|
@ -659,4 +658,9 @@ registration accepts non-ascii passwords
|
||||||
registration with inhibit_login inhibits login
|
registration with inhibit_login inhibits login
|
||||||
The operation must be consistent through an interactive authentication session
|
The operation must be consistent through an interactive authentication session
|
||||||
Multiple calls to /sync should not cause 500 errors
|
Multiple calls to /sync should not cause 500 errors
|
||||||
/context/ with lazy_load_members filter works
|
Canonical alias can be set
|
||||||
|
Canonical alias can include alt_aliases
|
||||||
|
Can delete canonical alias
|
||||||
|
Multiple calls to /sync should not cause 500 errors
|
||||||
|
AS can make room aliases
|
||||||
|
Accesing an AS-hosted room alias asks the AS server
|
||||||
|
|
Loading…
Reference in a new issue