Implement /keys/changes (#1232)

* Implement /keys/changes

And refactor QueryKeyChanges to accept a `to` offset.

* Unbreak tests

* Sort keys when serialising log tokens
This commit is contained in:
Kegsay 2020-07-30 14:52:21 +01:00 committed by GitHub
parent 9355fb5ac8
commit a2174d3294
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 153 additions and 55 deletions

View file

@ -138,6 +138,9 @@ type QueryKeyChangesRequest struct {
Partition int32
// The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning
Offset int64
// The inclusive offset where to track key changes up to. Messages with this offset are included in the response.
// Use sarama.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing).
ToOffset int64
}
type QueryKeyChangesResponse struct {

View file

@ -44,7 +44,7 @@ func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyC
if req.Partition < 0 {
req.Partition = a.Producer.DefaultPartition()
}
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset)
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset, req.ToOffset)
if err != nil {
res.Error = &api.KeyError{
Err: err.Error(),

View file

@ -48,7 +48,8 @@ type Database interface {
// their keys in some way.
StoreKeyChange(ctx context.Context, partition int32, offset int64, userID string) error
// KeyChanges returns a list of user IDs who have modified their keys from the offset given.
// KeyChanges returns a list of user IDs who have modified their keys from the offset given (exclusive) to the offset given (inclusive).
// A to offset of sarama.OffsetNewest means no upper limit.
// Returns the offset of the latest key change.
KeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error)
KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
}

View file

@ -17,7 +17,9 @@ package postgres
import (
"context"
"database/sql"
"math"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@ -44,7 +46,7 @@ const upsertKeyChangeSQL = "" +
// select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just
// take the max offset value as the latest offset.
const selectKeyChangesSQL = "" +
"SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 GROUP BY user_id"
"SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 AND log_offset <= $3 GROUP BY user_id"
type keyChangesStatements struct {
db *sql.DB
@ -75,9 +77,12 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition in
}
func (s *keyChangesStatements) SelectKeyChanges(
ctx context.Context, partition int32, fromOffset int64,
ctx context.Context, partition int32, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset)
if toOffset == sarama.OffsetNewest {
toOffset = math.MaxInt64
}
rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset)
if err != nil {
return nil, 0, err
}

View file

@ -78,6 +78,6 @@ func (d *Database) StoreKeyChange(ctx context.Context, partition int32, offset i
return d.KeyChangesTable.InsertKeyChange(ctx, partition, offset, userID)
}
func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error) {
return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset)
func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) {
return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset, toOffset)
}

View file

@ -17,7 +17,9 @@ package sqlite3
import (
"context"
"database/sql"
"math"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@ -45,7 +47,7 @@ const upsertKeyChangeSQL = "" +
// select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just
// take the max offset value as the latest offset.
const selectKeyChangesSQL = "" +
"SELECT user_id, MAX(offset) FROM keyserver_key_changes WHERE partition = $1 AND offset > $2 GROUP BY user_id"
"SELECT user_id, MAX(offset) FROM keyserver_key_changes WHERE partition = $1 AND offset > $2 AND offset <= $3 GROUP BY user_id"
type keyChangesStatements struct {
db *sql.DB
@ -76,9 +78,12 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition in
}
func (s *keyChangesStatements) SelectKeyChanges(
ctx context.Context, partition int32, fromOffset int64,
ctx context.Context, partition int32, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset)
if toOffset == sarama.OffsetNewest {
toOffset = math.MaxInt64
}
rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset)
if err != nil {
return nil, 0, err
}

View file

@ -4,6 +4,8 @@ import (
"context"
"reflect"
"testing"
"github.com/Shopify/sarama"
)
var ctx = context.Background()
@ -24,7 +26,7 @@ func TestKeyChanges(t *testing.T) {
MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@bob:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@charlie:localhost"))
userIDs, latest, err := db.KeyChanges(ctx, 0, 1)
userIDs, latest, err := db.KeyChanges(ctx, 0, 1, sarama.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
@ -44,7 +46,7 @@ func TestKeyChangesNoDupes(t *testing.T) {
MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@alice:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@alice:localhost"))
userIDs, latest, err := db.KeyChanges(ctx, 0, 0)
userIDs, latest, err := db.KeyChanges(ctx, 0, 0, sarama.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
@ -55,3 +57,23 @@ func TestKeyChangesNoDupes(t *testing.T) {
t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
}
}
func TestKeyChangesUpperLimit(t *testing.T) {
db, err := NewDatabase("file::memory:", nil)
if err != nil {
t.Fatalf("Failed to NewDatabase: %s", err)
}
MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@bob:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@charlie:localhost"))
userIDs, latest, err := db.KeyChanges(ctx, 0, 0, 1)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
if latest != 1 {
t.Fatalf("KeyChanges: got latest=%d want 1", latest)
}
if !reflect.DeepEqual(userIDs, []string{"@bob:localhost"}) {
t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
}
}

View file

@ -38,5 +38,7 @@ type DeviceKeys interface {
type KeyChanges interface {
InsertKeyChange(ctx context.Context, partition int32, offset int64, userID string) error
SelectKeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error)
// SelectKeyChanges returns the set (de-duplicated) of users who have changed their keys between the two offsets.
// Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of sarama.OffsetNewest means no upper offset.
SelectKeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
}

View file

@ -33,15 +33,15 @@ const DeviceListLogName = "dl"
// be already filled in with join/leave information.
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
userID string, res *types.Response, tok types.StreamingToken,
) (newTok *types.StreamingToken, hasNew bool, err error) {
userID string, res *types.Response, from, to types.StreamingToken,
) (hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return nil, false, err
return false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
@ -54,7 +54,7 @@ func DeviceListCatchup(
var offset int64
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
logOffset := tok.Log(DeviceListLogName)
logOffset := from.Log(DeviceListLogName)
if logOffset != nil {
partition = logOffset.Partition
offset = logOffset.Offset
@ -62,15 +62,23 @@ func DeviceListCatchup(
partition = -1
offset = sarama.OffsetOldest
}
var toOffset int64
toLog := to.Log(DeviceListLogName)
if toLog != nil {
toOffset = toLog.Offset
} else {
toOffset = sarama.OffsetNewest
}
var queryRes api.QueryKeyChangesResponse
keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
Partition: partition,
Offset: offset,
ToOffset: toOffset,
}, &queryRes)
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
return
return hasNew, nil
}
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
@ -82,13 +90,7 @@ func DeviceListCatchup(
hasNew = true
}
}
// Make a new streaming token using the new offset
tok.SetLog(DeviceListLogName, &types.LogPosition{
Offset: queryRes.Offset,
Partition: queryRes.Partition,
})
newTok = &tok
return
return hasNew, nil
}
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.

View file

@ -6,6 +6,7 @@ import (
"sort"
"testing"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@ -15,6 +16,12 @@ import (
var (
syncingUser = "@alice:localhost"
emptyToken = types.NewStreamToken(0, 0, nil)
newestToken = types.NewStreamToken(0, 0, map[string]*types.LogPosition{
DeviceListLogName: &types.LogPosition{
Offset: sarama.OffsetNewest,
Partition: 0,
},
})
)
type mockKeyAPI struct{}
@ -162,12 +169,12 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser},
"!another:room": {syncingUser},
},
}, syncingUser, syncResponse, emptyToken)
}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -184,12 +191,12 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {removeUser},
"!another:room": {syncingUser},
},
}, syncingUser, syncResponse, emptyToken)
}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -206,12 +213,12 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, existingUser},
"!another:room": {syncingUser, existingUser},
},
}, syncingUser, syncResponse, emptyToken)
}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@ -227,12 +234,12 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {existingUser},
"!another:room": {syncingUser, existingUser},
},
}, syncingUser, syncResponse, emptyToken)
}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -286,11 +293,11 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
jr.Timeline.Events = roomTimelineEvents
syncResponse.Rooms.Join[roomID] = jr
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {syncingUser, existingUser},
},
}, syncingUser, syncResponse, emptyToken)
}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -311,13 +318,13 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
"!another:room": {syncingUser},
},
}, syncingUser, syncResponse, emptyToken)
}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@ -396,12 +403,12 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
lr.Timeline.Events = roomEvents
syncResponse.Rooms.Leave[roomID] = lr
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {newShareUser, newShareUser2},
"!another:room": {syncingUser},
},
}, syncingUser, syncResponse, emptyToken)
}, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}

View file

@ -75,4 +75,8 @@ func Setup(
return GetFilter(req, device, syncDB, vars["userId"], vars["filterId"])
}),
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/keys/changes", httputil.MakeAuthAPI("keys_changes", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return srp.OnIncomingKeyChangeRequest(req, device)
})).Methods(http.MethodGet, http.MethodOptions)
}

View file

@ -143,6 +143,55 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
from := req.URL.Query().Get("from")
to := req.URL.Query().Get("to")
if from == "" || to == "" {
return util.JSONResponse{
Code: 400,
JSON: jsonerror.InvalidArgumentValue("missing ?from= or ?to="),
}
}
fromToken, err := types.NewStreamTokenFromString(from)
if err != nil {
return util.JSONResponse{
Code: 400,
JSON: jsonerror.InvalidArgumentValue("bad 'from' value"),
}
}
toToken, err := types.NewStreamTokenFromString(to)
if err != nil {
return util.JSONResponse{
Code: 400,
JSON: jsonerror.InvalidArgumentValue("bad 'to' value"),
}
}
// work out room joins/leaves
res, err := rp.db.IncrementalSync(
req.Context(), types.NewResponse(), *device, fromToken, toToken, 0, false,
)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync")
return jsonerror.InternalServerError()
}
res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
Code: 200,
JSON: struct {
Changed []string `json:"changed"`
Left []string `json:"left"`
}{
Changed: res.DeviceLists.Changed,
Left: res.DeviceLists.Left,
},
}
}
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) {
res = types.NewResponse()
@ -172,7 +221,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil {
return
}
res, err = rp.appendDeviceLists(res, req.device.UserID, since)
res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos)
if err != nil {
return
}
@ -205,14 +254,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
}
func (rp *RequestPool) appendDeviceLists(
data *types.Response, userID string, since types.StreamingToken,
data *types.Response, userID string, since, to types.StreamingToken,
) (*types.Response, error) {
// TODO: Currently this code will race which may result in duplicates but not missing data.
// This happens because, whilst we are told the range to fetch here (since / latest) the
// QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then
// returns the latest position with which the response has authority on). We'd need to tweak
// the API to expose a "to" value to fix this.
_, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since)
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to)
if err != nil {
return nil, err
}

View file

@ -18,6 +18,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sort"
"strconv"
"strings"
@ -129,15 +130,14 @@ func (t *StreamingToken) EDUPosition() StreamPosition {
return t.Positions[1]
}
func (t *StreamingToken) String() string {
logStrings := []string{
t.syncToken.String(),
}
var logStrings []string
for name, lp := range t.logs {
logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset)
logStrings = append(logStrings, logStr)
}
sort.Strings(logStrings)
// E.g s11_22_33.dl0-134.ab1-441
return strings.Join(logStrings, ".")
return strings.Join(append([]string{t.syncToken.String()}, logStrings...), ".")
}
// IsAfter returns true if ANY position in this token is greater than `other`.

View file

@ -20,7 +20,7 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
},
},
},
"s4_0.dl-0-123.ab-1-14419482332": &StreamingToken{
"s4_0.ab-1-14419482332.dl-0-123": &StreamingToken{
syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}},
logs: map[string]*LogPosition{
"ab": &LogPosition{
@ -46,8 +46,9 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
if !reflect.DeepEqual(got, *want) {
t.Errorf("%s mismatch: got %v want %v", tok, got, want)
}
if got.String() != tok {
t.Errorf("%s reserialisation mismatch: got %s want %s", tok, got.String(), tok)
gotStr := got.String()
if gotStr != tok {
t.Errorf("%s reserialisation mismatch: got %s want %s", tok, gotStr, tok)
}
}
}

View file

@ -128,6 +128,8 @@ query for user with no keys returns empty key dict
Can claim one time key using POST
Can claim remote one time key using POST
Local device key changes appear in v2 /sync
Local device key changes appear in /keys/changes
Get left notifs for other users in sync and /keys/changes when user leaves
Can add account data
Can add account data to room
Can get account data without syncing