mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 06:53:09 -06:00
Glue together db code with sync server
This commit is contained in:
parent
d935d6db25
commit
6499b70c24
|
|
@ -0,0 +1,14 @@
|
||||||
|
package events
|
||||||
|
|
||||||
|
import "encoding/json"
|
||||||
|
|
||||||
|
// ClientEvent is an event which is fit for consumption by clients, in accordance with the specification.
|
||||||
|
type ClientEvent struct {
|
||||||
|
Content json.RawMessage `json:"content"`
|
||||||
|
Sender string `json:"sender"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
StateKey *string `json:"state_key,omitempty"`
|
||||||
|
Unsigned json.RawMessage `json:"unsigned"`
|
||||||
|
OriginServerTS int64 `json:"origin_server_ts"`
|
||||||
|
EventID string `json:"event_id"`
|
||||||
|
}
|
||||||
|
|
@ -96,8 +96,9 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error)
|
||||||
return types.StreamPosition(id), nil
|
return types.StreamPosition(id), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompleteSync returns a map of room ID to RoomData.
|
// CompleteSync returns all the data needed in order to create a complete sync response.
|
||||||
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) {
|
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) {
|
||||||
|
data = make(map[string]types.RoomData)
|
||||||
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
|
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
|
||||||
// a consistent view of the database throughout. This includes extracting the sync stream position.
|
// a consistent view of the database throughout. This includes extracting the sync stream position.
|
||||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
|
|
||||||
|
|
@ -139,6 +139,23 @@ func (rp *RequestPool) waitForEvents(req syncRequest) types.StreamPosition {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
|
||||||
|
if req.since == types.StreamPosition(0) {
|
||||||
|
pos, data, err := rp.db.CompleteSync(req.userID, 3)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res := types.NewResponse()
|
||||||
|
res.NextBatch = pos.String()
|
||||||
|
for roomID, d := range data {
|
||||||
|
jr := types.NewJoinResponse()
|
||||||
|
jr.Timeline.Events = d.RecentEvents
|
||||||
|
jr.Timeline.Limited = true
|
||||||
|
jr.State.Events = d.State
|
||||||
|
res.Rooms.Join[roomID] = *jr
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
currentPos := rp.waitForEvents(req)
|
currentPos := rp.waitForEvents(req)
|
||||||
|
|
||||||
// TODO: handle ignored users
|
// TODO: handle ignored users
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue