From da215c2f1d3a9f7be53bca85a57fa2e370784f02 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Fri, 28 Oct 2022 10:50:33 +0200 Subject: [PATCH] Avoid race conditions, remove unreachable code --- syncapi/streams/stream_invite.go | 1 + syncapi/streams/streams.go | 2 -- syncapi/sync/request.go | 2 ++ syncapi/sync/requestpool.go | 2 ++ syncapi/types/provider.go | 2 ++ 5 files changed, 7 insertions(+), 2 deletions(-) diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index 700f25c10..542f55722 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -80,6 +80,7 @@ func (p *InviteStreamProvider) IncrementalSync( if _, ok := req.Response.Rooms.Join[roomID]; ok { continue } + lr := types.NewLeaveResponse() h := sha256.Sum256(append([]byte(roomID), []byte(strconv.FormatInt(int64(to), 10))...)) lr.Timeline.Events = append(lr.Timeline.Events, gomatrixserverlib.ClientEvent{ diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index caa428827..c3b2ac348 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -128,7 +128,6 @@ func ToToken(provider StreamProvider, position types.StreamPosition) types.Strea default: panic(fmt.Sprintf("unknown stream provider: %T", t)) } - return types.StreamingToken{} } func IncrementalPositions(provider StreamProvider, current, since types.StreamingToken) (types.StreamPosition, types.StreamPosition) { @@ -154,5 +153,4 @@ func IncrementalPositions(provider StreamProvider, current, since types.Streamin default: panic(fmt.Sprintf("unknown stream provider: %T", t)) } - return 0, 0 } diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 620dfdcdb..c54f6db4e 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -21,6 +21,7 @@ import ( "math" "net/http" "strconv" + "sync" "time" "github.com/matrix-org/gomatrixserverlib" @@ -101,6 +102,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat Rooms: make(map[string]string), // Populated by the PDU stream WantFullState: wantFullState, // MembershipChanges: make(map[string]struct{}), // Populated by the PDU stream + SyncMu: &sync.Mutex{}, }, nil } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 766c58737..2affd1074 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -326,6 +326,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. succeeded = err == nil sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err) }() + syncReq.SyncMu.Lock() + defer syncReq.SyncMu.Unlock() return f(snapshot) } diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 9a533002b..614d99273 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -2,6 +2,7 @@ package types import ( "context" + "sync" "time" "github.com/matrix-org/gomatrixserverlib" @@ -26,6 +27,7 @@ type SyncRequest struct { MembershipChanges map[string]struct{} // Updated by the PDU stream. IgnoredUsers IgnoredUsers + SyncMu *sync.Mutex } func (r *SyncRequest) IsRoomPresent(roomID string) bool {