dendrite/syncapi/streams/stream_sendtodevice.go
Tak Wai Wong 8a66a55771 Pull the fix for user_interactive crash from dendrite fork (#249)
* Fix issues with migrations not getting executed (#2628)

* Fix issues with migrations not getting executed

* Check actual postgres error

* Return error if it's not "column does not exist"

* Send-to-device/sync tweaks (#2630)

* Always delete send to device messages

* Omit empty to_device

* Tweak /sync response to omit empty values

* Add housekeeping function to delete old/expired EDUs (#2399)

* Add housekeeping function to delete old/expired EDUs

* Add migrations

* Evict EDUs from cache

* Fix queries

* Fix upgrade

* Use map[string]time.Duration to specify different expiry times

* Fix copy & paste mistake

* Set expires_at to tomorrow

* Don't allow NULL

* Add comment

* Add tests

* Use new testrig package

* Fix migrations

* Never expire m.direct_to_device

Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Co-authored-by: kegsay <kegan@matrix.org>

* Remove nonce generation for eip4361 signin (#25)

Co-authored-by: Tak Wai Wong <tak@hntlabs.com>

* Fix user_interactive crash when multiple clients try to log in (#26)

* add rw locks

* lock / unlock protect other fields

Co-authored-by: Tak Wai Wong <tak@hntlabs.com>

Co-authored-by: Till <2353100+S7evinK@users.noreply.github.com>
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Co-authored-by: kegsay <kegan@matrix.org>
Co-authored-by: Tak Wai Wong <tak@hntlabs.com>
Co-authored-by: Tak Wai Wong <takwaiw@gmail.com>
2022-08-10 18:44:28 -07:00

53 lines
1.3 KiB
Go

package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
)
type SendToDeviceStreamProvider struct {
StreamProvider
}
func (p *SendToDeviceStreamProvider) Setup() {
p.StreamProvider.Setup()
id, err := p.DB.MaxStreamPositionForSendToDeviceMessages(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *SendToDeviceStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *SendToDeviceStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
// See if we have any new tasks to do for the send-to-device messaging.
lastPos, events, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, from, to)
if err != nil {
req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed")
return from
}
// Add the updates into the sync response.
for _, event := range events {
// skip ignored user events
if _, ok := req.IgnoredUsers.List[event.Sender]; ok {
continue
}
req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent)
}
return lastPos
}