From 877ea5cb62683e1476e72c383aa3e8450190d259 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 18 Aug 2017 11:33:10 +0100 Subject: [PATCH 1/3] Remove StopProcessingAfter from the roomserver consumer as it is unused (#186) --- .../dendrite/roomserver/input/input.go | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go index c8ac58d3a..210abfa29 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -17,9 +17,6 @@ package input import ( "encoding/json" - "fmt" - "sync/atomic" - "net/http" "github.com/matrix-org/dendrite/common" @@ -35,15 +32,6 @@ type RoomserverInputAPI struct { // The kafkaesque topic to output new room events to. // This is the name used in kafka to identify the stream to write events to. OutputRoomEventTopic string - // If non-nil then the API will stop processing messages after this - // many messages and will shutdown. Malformed messages are not in the count. - StopProcessingAfter *int64 - // If not-nil then the API will call this to shutdown the server. - // If this is nil then the API will continue to process messsages even - // though StopProcessingAfter has been reached. - ShutdownCallback func(reason string) - // How many messages the consumer has processed. - processed int64 } // WriteOutputEvents implements OutputRoomEventWriter @@ -72,18 +60,6 @@ func (r *RoomserverInputAPI) InputRoomEvents( if err := processRoomEvent(r.DB, r, request.InputRoomEvents[i]); err != nil { return err } - // Update the number of processed messages using atomic addition because it is accessed from multiple goroutines. - processed := atomic.AddInt64(&r.processed, 1) - // Check if we should stop processing. - // Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages. - // If we try to stop processing after M message and we have N goroutines then we will process somewhere - // between M and (N + M) messages because the N goroutines could all try to process what they think will be the - // last message. We could be more careful here but this is good enough for getting rough benchmarks. - if r.StopProcessingAfter != nil && processed >= int64(*r.StopProcessingAfter) { - if r.ShutdownCallback != nil { - r.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", r.processed)) - } - } } return nil } From f607ef29c2cbd442da506c29da78c8b995093f22 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 18 Aug 2017 15:33:40 +0100 Subject: [PATCH 2/3] Add a route matching the trailing slash on the state event sending route (#182) * Add a route matching the trailing slash on the state event sending route * Use single route for both cases * Use synapse regexp --- .../matrix-org/dendrite/clientapi/routing/routing.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 68a9de075..b74c455ac 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -17,6 +17,7 @@ package routing import ( "encoding/json" "net/http" + "strings" "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" @@ -93,11 +94,16 @@ func Setup( return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], vars["txnID"], nil, cfg, queryAPI, producer) }), ) - r0mux.Handle("/rooms/{roomID}/state/{eventType}", + r0mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}", common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) emptyString := "" - return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], vars["txnID"], &emptyString, cfg, queryAPI, producer) + eventType := vars["eventType"] + // If there's a trailing slash, remove it + if strings.HasSuffix(eventType, "/") { + eventType = eventType[:len(eventType)-1] + } + return writers.SendEvent(req, device, vars["roomID"], eventType, vars["txnID"], &emptyString, cfg, queryAPI, producer) }), ) r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}", From efbc14f7b95214aa06c85e2ace25c6b15bedffd2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 18 Aug 2017 16:10:28 +0100 Subject: [PATCH 3/3] `vars["txnID"]` is always empty for state events (#188) --- .../matrix-org/dendrite/clientapi/routing/routing.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index b74c455ac..eb84b2a9e 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -103,14 +103,14 @@ func Setup( if strings.HasSuffix(eventType, "/") { eventType = eventType[:len(eventType)-1] } - return writers.SendEvent(req, device, vars["roomID"], eventType, vars["txnID"], &emptyString, cfg, queryAPI, producer) + return writers.SendEvent(req, device, vars["roomID"], eventType, "", &emptyString, cfg, queryAPI, producer) }), ) r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}", common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) stateKey := vars["stateKey"] - return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], vars["txnID"], &stateKey, cfg, queryAPI, producer) + return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], "", &stateKey, cfg, queryAPI, producer) }), )