Incremental sync op made aysnc

This commit is contained in:
SUMUKHA-PK 2019-08-02 10:09:43 +05:30
parent aa9fafa263
commit 19b867fbbc
2 changed files with 44 additions and 73 deletions

View file

@ -18,6 +18,8 @@ import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
@ -28,7 +30,7 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
// newTag creates and returns a new Tag type // newTag creates and returns a new gomatrix.TagContent
func newTag() gomatrix.TagContent { func newTag() gomatrix.TagContent {
return gomatrix.TagContent{ return gomatrix.TagContent{
Tags: make(map[string]gomatrix.TagProperties), Tags: make(map[string]gomatrix.TagProperties),
@ -53,37 +55,26 @@ func GetTags(
} }
_, data, err := obtainSavedTags(req, userID, roomID, accountDB) _, data, err := obtainSavedTags(req, userID, roomID, accountDB)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
dataByte, err := json.Marshal(data) if len(data) == 0 {
if err != nil { return util.JSONResponse{
return httputil.LogThenError(req, err) Code: http.StatusOK,
JSON: struct{}{},
}
} }
var tagData []gomatrixserverlib.ClientEvent go func() {
tagContent := newTag()
err = json.Unmarshal(dataByte, &tagData)
if err != nil {
return httputil.LogThenError(req, err)
}
err = json.Unmarshal(tagData[0].Content, &tagContent)
if err != nil {
return httputil.LogThenError(req, err)
}
// send data to syncapi
if err := syncProducer.SendData(userID, roomID, "m.tag"); err != nil { if err := syncProducer.SendData(userID, roomID, "m.tag"); err != nil {
return httputil.LogThenError(req, err) logrus.WithError(err).Error("Incremental sync operation failed")
} }
}()
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: tagContent, JSON: struct{}{},
} }
} }
@ -107,39 +98,32 @@ func PutTag(
} }
} }
localpart, data, err := obtainSavedTags(req, userID, roomID, accountDB)
if err != nil {
return httputil.LogThenError(req, err)
}
var properties gomatrix.TagProperties var properties gomatrix.TagProperties
if reqErr := httputil.UnmarshalJSONRequest(req, &properties); reqErr != nil { if reqErr := httputil.UnmarshalJSONRequest(req, &properties); reqErr != nil {
return *reqErr return *reqErr
} }
tagContent := newTag() localpart, data, err := obtainSavedTags(req, userID, roomID, accountDB)
var dataByte []byte
if len(data) > 0 {
dataByte, err = json.Marshal(data)
if err != nil {
return httputil.LogThenError(req, err)
}
if err = json.Unmarshal(dataByte, &tagContent); err != nil {
return httputil.LogThenError(req, err)
}
}
tagContent.Tags[tag] = properties
err = saveTagData(req, localpart, roomID, accountDB, tagContent)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
// send data to syncapi tagContent := newTag()
if err := syncProducer.SendData(userID, roomID, "m.tag"); err != nil { if len(data) > 0 {
if err = json.Unmarshal(data[0].Content, &tagContent); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
}
tagContent.Tags[tag] = properties
if err = saveTagData(req, localpart, roomID, accountDB, tagContent); err != nil {
return httputil.LogThenError(req, err)
}
go func() {
if err := syncProducer.SendData(userID, roomID, "m.tag"); err != nil {
logrus.WithError(err).Error("Incremental sync operation failed")
}
}()
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
@ -163,38 +147,26 @@ func DeleteTag(
if device.UserID != userID { if device.UserID != userID {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusForbidden, Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("Cannot delete another user's tags"), JSON: jsonerror.Forbidden("Cannot modify another user's tags"),
} }
} }
localpart, data, err := obtainSavedTags(req, userID, roomID, accountDB) localpart, data, err := obtainSavedTags(req, userID, roomID, accountDB)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
// If there are no tags in the database, exit. // If there are no tags in the database, exit
if len(data) == 0 { if len(data) == 0 {
//Synapse returns a 200 OK response on finding no Tags, same policy is followed here. //Specifications mentions a 200 OK response is returned on finding no Tags, same policy is followed here.
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: struct{}{}, JSON: struct{}{},
} }
} }
dataByte, err := json.Marshal(data)
if err != nil {
return httputil.LogThenError(req, err)
}
var tagData []gomatrixserverlib.ClientEvent
tagContent := newTag() tagContent := newTag()
err = json.Unmarshal(dataByte, &tagData) err = json.Unmarshal(data[0].Content, &tagContent)
if err != nil {
return httputil.LogThenError(req, err)
}
err = json.Unmarshal(tagData[0].Content, &tagContent)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -203,22 +175,21 @@ func DeleteTag(
if _, ok := tagContent.Tags[tag]; ok { if _, ok := tagContent.Tags[tag]; ok {
delete(tagContent.Tags, tag) delete(tagContent.Tags, tag)
} else { } else {
//Synapse returns a 200 OK response on finding no Tags, same policy is followed here. //Specifications mentions a 200 OK response is returned on finding no Tags, same policy is followed here.
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: struct{}{}, JSON: struct{}{},
} }
} }
err = saveTagData(req, localpart, roomID, accountDB, tagContent) if err = saveTagData(req, localpart, roomID, accountDB, tagContent); err != nil {
if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
// send data to syncapi go func() {
if err := syncProducer.SendData(userID, roomID, "m.tag"); err != nil { if err := syncProducer.SendData(userID, roomID, "m.tag"); err != nil {
return httputil.LogThenError(req, err) logrus.WithError(err).Error("Incremental sync operation failed")
} }
}()
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,

View file

@ -147,8 +147,6 @@ Inbound federation can receive room-join requests
Typing events appear in initial sync Typing events appear in initial sync
Typing events appear in incremental sync Typing events appear in incremental sync
Typing events appear in gapped sync Typing events appear in gapped sync
Can add tag
Can remove tag
Inbound federation of state requires event_id as a mandatory paramater Inbound federation of state requires event_id as a mandatory paramater
Inbound federation of state_ids requires event_id as a mandatory paramater Inbound federation of state_ids requires event_id as a mandatory paramater
POST /register returns the same device_id as that in the request POST /register returns the same device_id as that in the request
@ -161,6 +159,8 @@ Inbound federation rejects remote attempts to kick local users to rooms
An event which redacts itself should be ignored An event which redacts itself should be ignored
A pair of events which redact each other should be ignored A pair of events which redact each other should be ignored
Full state sync includes joined rooms Full state sync includes joined rooms
Can add tag
Can remove tag
Can list tags for a room Can list tags for a room
Tags appear in an initial v2 /sync Tags appear in an initial v2 /sync
Newly updated tags appear in an incremental v2 /sync Newly updated tags appear in an incremental v2 /sync