Merge branch 'master' into neilalexander/localtracking

This commit is contained in:
Neil Alexander 2020-05-20 17:41:29 +01:00 committed by GitHub
commit facfea265f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 218 additions and 50 deletions

4
build-dendritejs.sh Executable file
View file

@ -0,0 +1,4 @@
#!/bin/bash -eu
export GIT_COMMIT=$(git rev-list -1 HEAD) && \
GOOS=js GOARCH=wasm go build -ldflags "-X main.GitCommit=$GIT_COMMIT" -o main.wasm ./cmd/dendritejs

View file

@ -20,6 +20,7 @@ import (
"crypto/ed25519"
"fmt"
"net/http"
"syscall/js"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
@ -45,15 +46,95 @@ import (
_ "github.com/matrix-org/go-sqlite3-js"
)
var GitCommit string
func init() {
fmt.Println("dendrite.js starting...")
fmt.Printf("[%s] dendrite.js starting...\n", GitCommit)
}
const keyNameEd25519 = "_go_ed25519_key"
func readKeyFromLocalStorage() (key ed25519.PrivateKey, err error) {
localforage := js.Global().Get("localforage")
if !localforage.Truthy() {
err = fmt.Errorf("readKeyFromLocalStorage: no localforage")
return
}
// https://localforage.github.io/localForage/
item, ok := await(localforage.Call("getItem", keyNameEd25519))
if !ok || !item.Truthy() {
err = fmt.Errorf("readKeyFromLocalStorage: no key in localforage")
return
}
fmt.Println("Found key in localforage")
// extract []byte and make an ed25519 key
seed := make([]byte, 32, 32)
js.CopyBytesToGo(seed, item)
return ed25519.NewKeyFromSeed(seed), nil
}
func writeKeyToLocalStorage(key ed25519.PrivateKey) error {
localforage := js.Global().Get("localforage")
if !localforage.Truthy() {
return fmt.Errorf("writeKeyToLocalStorage: no localforage")
}
// make a Uint8Array from the key's seed
seed := key.Seed()
jsSeed := js.Global().Get("Uint8Array").New(len(seed))
js.CopyBytesToJS(jsSeed, seed)
// write it
localforage.Call("setItem", keyNameEd25519, jsSeed)
return nil
}
// taken from https://go-review.googlesource.com/c/go/+/150917
// await waits until the promise v has been resolved or rejected and returns the promise's result value.
// The boolean value ok is true if the promise has been resolved, false if it has been rejected.
// If v is not a promise, v itself is returned as the value and ok is true.
func await(v js.Value) (result js.Value, ok bool) {
if v.Type() != js.TypeObject || v.Get("then").Type() != js.TypeFunction {
return v, true
}
done := make(chan struct{})
onResolve := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
result = args[0]
ok = true
close(done)
return nil
})
defer onResolve.Release()
onReject := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
result = args[0]
ok = false
close(done)
return nil
})
defer onReject.Release()
v.Call("then", onResolve, onReject)
<-done
return
}
func generateKey() ed25519.PrivateKey {
_, priv, err := ed25519.GenerateKey(nil)
// attempt to look for a seed in JS-land and if it exists use it.
priv, err := readKeyFromLocalStorage()
if err == nil {
fmt.Println("Read key from localStorage")
return priv
}
// generate a new key
fmt.Println(err, " : Generating new ed25519 key")
_, priv, err = ed25519.GenerateKey(nil)
if err != nil {
logrus.Fatalf("Failed to generate ed25519 key: %s", err)
}
if err := writeKeyToLocalStorage(priv); err != nil {
fmt.Println("failed to write key to localStorage: ", err)
// non-fatal, we'll just have amnesia for a while
}
return priv
}

View file

@ -69,9 +69,14 @@ func Backfill(
// Populate the request.
req := api.QueryBackfillRequest{
RoomID: roomID,
EarliestEventsIDs: eIDs,
ServerName: request.Origin(),
RoomID: roomID,
// we don't know who the successors are for these events, which won't
// be a problem because we don't use that information when servicing /backfill requests,
// only when making them. TODO: Think of a better API shape
BackwardsExtremities: map[string][]string{
"": eIDs,
},
ServerName: request.Origin(),
}
if req.Limit, err = strconv.Atoi(limit); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("strconv.Atoi failed")
@ -97,7 +102,7 @@ func Backfill(
}
}
var eventJSONs []json.RawMessage
eventJSONs := []json.RawMessage{}
for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(
evs,
gomatrixserverlib.TopologicalOrderByPrevEvents,
@ -105,6 +110,12 @@ func Backfill(
eventJSONs = append(eventJSONs, e.JSON())
}
// sytest wants these in reversed order, similar to /messages, so reverse them now.
for i := len(eventJSONs)/2 - 1; i >= 0; i-- {
opp := len(eventJSONs) - 1 - i
eventJSONs[i], eventJSONs[opp] = eventJSONs[opp], eventJSONs[i]
}
txn := gomatrixserverlib.Transaction{
Origin: cfg.Matrix.ServerName,
PDUs: eventJSONs,

View file

@ -21,6 +21,7 @@ import (
commonHTTP "github.com/matrix-org/dendrite/common/http"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
opentracing "github.com/opentracing/opentracing-go"
)
@ -228,14 +229,24 @@ type QueryStateAndAuthChainResponse struct {
type QueryBackfillRequest struct {
// The room to backfill
RoomID string `json:"room_id"`
// Events to start paginating from.
EarliestEventsIDs []string `json:"earliest_event_ids"`
// A map of backwards extremity event ID to a list of its prev_event IDs.
BackwardsExtremities map[string][]string `json:"backwards_extremities"`
// The maximum number of events to retrieve.
Limit int `json:"limit"`
// The server interested in the events.
ServerName gomatrixserverlib.ServerName `json:"server_name"`
}
// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order.
func (r *QueryBackfillRequest) PrevEventIDs() []string {
var prevEventIDs []string
for _, pes := range r.BackwardsExtremities {
prevEventIDs = append(prevEventIDs, pes...)
}
prevEventIDs = util.UniqueStrings(prevEventIDs)
return prevEventIDs
}
// QueryBackfillResponse is a response to QueryBackfill.
type QueryBackfillResponse struct {
// Missing events, arbritrary order.

View file

@ -495,14 +495,8 @@ func (r *RoomserverInternalAPI) QueryBackfill(
// defines the highest number of elements in the map below.
visited := make(map[string]bool, request.Limit)
// The provided event IDs have already been seen by the request's emitter,
// and will be retrieved anyway, so there's no need to care about them if
// they appear in our exploration of the event tree.
for _, id := range request.EarliestEventsIDs {
visited[id] = true
}
front = request.EarliestEventsIDs
// this will include these events which is what we want
front = request.PrevEventIDs()
// Scan the event tree for events to send back.
resultNIDs, err := r.scanEventTree(ctx, front, visited, request.Limit, request.ServerName)
@ -534,10 +528,15 @@ func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req *
if err != nil {
return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
}
requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName)
requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName, req.BackwardsExtremities)
// Request 100 items regardless of what the query asks for.
// We don't want to go much higher than this.
// We can't honour exactly the limit as some sytests rely on requesting more for tests to pass
// (so we don't need to hit /state_ids which the test has no listener for)
// Specifically the test "Outbound federation can backfill events"
events, err := gomatrixserverlib.RequestBackfill(
ctx, requester,
r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit)
r.KeyRing, req.RoomID, roomVer, req.PrevEventIDs(), 100)
if err != nil {
return err
}

View file

@ -7,6 +7,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
@ -15,6 +16,7 @@ type backfillRequester struct {
db storage.Database
fedClient *gomatrixserverlib.FederationClient
thisServer gomatrixserverlib.ServerName
bwExtrems map[string][]string
// per-request state
servers []gomatrixserverlib.ServerName
@ -22,13 +24,14 @@ type backfillRequester struct {
eventIDMap map[string]gomatrixserverlib.Event
}
func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName) *backfillRequester {
func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester {
return &backfillRequester{
db: db,
fedClient: fedClient,
thisServer: thisServer,
eventIDToBeforeStateIDs: make(map[string][]string),
eventIDMap: make(map[string]gomatrixserverlib.Event),
bwExtrems: bwExtrems,
}
}
@ -37,6 +40,11 @@ func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent
if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok {
return ids, nil
}
if len(targetEvent.PrevEventIDs()) == 0 && targetEvent.Type() == "m.room.create" && targetEvent.StateKeyEquals("") {
util.GetLogger(ctx).WithField("room_id", targetEvent.RoomID()).Info("Backfilled to the beginning of the room")
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = []string{}
return nil, nil
}
// if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event.
// Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or
// we don't know the result of state res to merge forks (2 or more prev_events)
@ -154,26 +162,44 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr
// It returns a list of servers which can be queried for backfill requests. These servers
// will be servers that are in the room already. The entries at the beginning are preferred servers
// and will be tried first. An empty list will fail the request.
func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) (servers []gomatrixserverlib.ServerName) {
func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID string) []gomatrixserverlib.ServerName {
// eventID will be a prev_event ID of a backwards extremity, meaning we will not have a database entry for it. Instead, use
// its successor, so look it up.
successor := ""
FindSuccessor:
for sucID, prevEventIDs := range b.bwExtrems {
for _, pe := range prevEventIDs {
if pe == eventID {
successor = sucID
break FindSuccessor
}
}
}
if successor == "" {
logrus.WithField("event_id", eventID).Error("ServersAtEvent: failed to find successor of this event to determine room state")
return nil
}
eventID = successor
// getMembershipsBeforeEventNID requires a NID, so retrieving the NID for
// the event is necessary.
NIDs, err := b.db.EventNIDs(ctx, []string{eventID})
if err != nil {
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get event NID for event")
return
return nil
}
stateEntries, err := stateBeforeEvent(ctx, b.db, NIDs[eventID])
if err != nil {
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event")
return
return nil
}
// possibly return all joined servers depending on history visiblity
memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries)
if err != nil {
logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules")
return
return nil
}
logrus.Infof("ServersAtEvent including %d current events from history visibility", len(memberEventsFromVis))
@ -183,7 +209,7 @@ func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID
memberEvents, err := getMembershipsAtState(ctx, b.db, stateEntries, true)
if err != nil {
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event")
return
return nil
}
memberEvents = append(memberEvents, memberEventsFromVis...)
@ -192,6 +218,7 @@ func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID
for _, event := range memberEvents {
serverSet[event.Origin()] = true
}
var servers []gomatrixserverlib.ServerName
for server := range serverSet {
if server == b.thisServer {
continue
@ -199,7 +226,7 @@ func (b *backfillRequester) ServersAtEvent(ctx context.Context, roomID, eventID
servers = append(servers, server)
}
b.servers = servers
return
return servers
}
// Backfill performs a backfill request to the given server.

View file

@ -205,6 +205,7 @@ func (r *messagesReq) retrieveEvents() (
}
var events []gomatrixserverlib.HeaderedEvent
util.GetLogger(r.ctx).WithField("start", start).WithField("end", end).Infof("Fetched %d events locally", len(streamEvents))
// There can be two reasons for streamEvents to be empty: either we've
// reached the oldest event in the room (or the most recent one, depending
@ -373,13 +374,13 @@ func (e eventsByDepth) Less(i, j int) bool {
// event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
var res api.QueryBackfillResponse
err := r.rsAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
RoomID: roomID,
EarliestEventsIDs: fromEventIDs,
Limit: limit,
ServerName: r.cfg.Matrix.ServerName,
RoomID: roomID,
BackwardsExtremities: backwardsExtremities,
Limit: limit,
ServerName: r.cfg.Matrix.ServerName,
}, &res)
if err != nil {
return nil, fmt.Errorf("QueryBackfill failed: %w", err)
@ -412,7 +413,14 @@ func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int)
}
}
return res.Events, nil
// we may have got more than the requested limit so resize now
events := res.Events
if len(events) > limit {
// last `limit` events
events = events[len(events)-limit:]
}
return events, nil
}
// setToDefault returns the default value for the "to" query parameter of a

View file

@ -94,9 +94,8 @@ type Database interface {
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
// EventPositionInTopology returns the depth and stream position of the given event.
EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error)
// BackwardExtremitiesForRoom returns the event IDs of all of the backward
// extremities we know of for a given room.
BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error)
// BackwardExtremitiesForRoom returns a map of backwards extremity event ID to a list of its prev_events.
BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities map[string][]string, err error)
// MaxTopologicalPosition returns the highest topological position for a given room.
MaxTopologicalPosition(ctx context.Context, roomID string) (types.TopologyToken, error)
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and

View file

@ -41,7 +41,7 @@ const insertBackwardExtremitySQL = "" +
" ON CONFLICT DO NOTHING"
const selectBackwardExtremitiesForRoomSQL = "" +
"SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
"SELECT event_id, prev_event_id FROM syncapi_backward_extremities WHERE room_id = $1"
const deleteBackwardExtremitySQL = "" +
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
@ -79,23 +79,24 @@ func (s *backwardExtremitiesStatements) InsertsBackwardExtremity(
func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (eventIDs []string, err error) {
) (bwExtrems map[string][]string, err error) {
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
if err != nil {
return
}
defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
bwExtrems = make(map[string][]string)
for rows.Next() {
var eID string
if err = rows.Scan(&eID); err != nil {
var prevEventID string
if err = rows.Scan(&eID, &prevEventID); err != nil {
return
}
eventIDs = append(eventIDs, eID)
bwExtrems[eID] = append(bwExtrems[eID], prevEventID)
}
return eventIDs, rows.Err()
return bwExtrems, rows.Err()
}
func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(

View file

@ -310,6 +310,7 @@ func (d *Database) updateRoomState(
}
membership = &value
}
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
return err
}
@ -367,7 +368,7 @@ func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken,
func (d *Database) BackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (backwardExtremities []string, err error) {
) (backwardExtremities map[string][]string, err error) {
return d.BackwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID)
}

View file

@ -41,7 +41,7 @@ const insertBackwardExtremitySQL = "" +
" ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING"
const selectBackwardExtremitiesForRoomSQL = "" +
"SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
"SELECT event_id, prev_event_id FROM syncapi_backward_extremities WHERE room_id = $1"
const deleteBackwardExtremitySQL = "" +
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
@ -79,23 +79,24 @@ func (s *backwardExtremitiesStatements) InsertsBackwardExtremity(
func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (eventIDs []string, err error) {
) (bwExtrems map[string][]string, err error) {
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
if err != nil {
return
}
defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
bwExtrems = make(map[string][]string)
for rows.Next() {
var eID string
if err = rows.Scan(&eID); err != nil {
var prevEventID string
if err = rows.Scan(&eID, &prevEventID); err != nil {
return
}
eventIDs = append(eventIDs, eID)
bwExtrems[eID] = append(bwExtrems[eID], prevEventID)
}
return eventIDs, rows.Err()
return bwExtrems, rows.Err()
}
func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(

View file

@ -89,8 +89,8 @@ type CurrentRoomState interface {
type BackwardsExtremities interface {
// InsertsBackwardExtremity inserts a new backwards extremity.
InsertsBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string) (err error)
// SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room.
SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (eventIDs []string, err error)
// SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room, as a map of event_id to list of prev_event_ids.
SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (bwExtrems map[string][]string, err error)
// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error)
}

View file

@ -16,6 +16,7 @@ package sync
import (
"context"
"encoding/json"
"net/http"
"strconv"
"time"
@ -30,6 +31,14 @@ import (
const defaultSyncTimeout = time.Duration(0)
const defaultTimelineLimit = 20
type filter struct {
Room struct {
Timeline struct {
Limit *int `json:"limit"`
} `json:"timeline"`
} `json:"room"`
}
// syncRequest represents a /sync request, with sensible defaults/sanity checks applied.
type syncRequest struct {
ctx context.Context
@ -54,6 +63,17 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e
}
since = &tok
}
timelineLimit := defaultTimelineLimit
// TODO: read from stored filters too
filterQuery := req.URL.Query().Get("filter")
if filterQuery != "" && filterQuery[0] == '{' {
// attempt to parse the timeline limit at least
var f filter
err := json.Unmarshal([]byte(filterQuery), &f)
if err == nil && f.Room.Timeline.Limit != nil {
timelineLimit = *f.Room.Timeline.Limit
}
}
// TODO: Additional query params: set_presence, filter
return &syncRequest{
ctx: req.Context(),
@ -61,7 +81,7 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e
timeout: timeout,
since: since,
wantFullState: wantFullState,
limit: defaultTimelineLimit, // TODO: read from filter
limit: timelineLimit,
log: util.GetLogger(req.Context()),
}, nil
}

View file

@ -59,6 +59,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
"userID": userID,
"since": syncReq.since,
"timeout": syncReq.timeout,
"limit": syncReq.limit,
})
currPos := rp.notifier.CurrentPosition()

View file

@ -279,3 +279,7 @@ Inbound federation can return missing events for invite visibility
Inbound federation can get public room list
An event which redacts itself should be ignored
A pair of events which redact each other should be ignored
Outbound federation can backfill events
Inbound federation can backfill events
Backfill checks the events requested belong to the room
Backfilled events whose prev_events are in a different room do not allow cross-room back-pagination