mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 01:13:10 -06:00
Get latest changes from master
This commit is contained in:
commit
86c2174faf
|
|
@ -17,6 +17,7 @@ package routing
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
|
|
@ -93,18 +94,23 @@ func Setup(
|
||||||
return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], vars["txnID"], nil, cfg, queryAPI, producer)
|
return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], vars["txnID"], nil, cfg, queryAPI, producer)
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
r0mux.Handle("/rooms/{roomID}/state/{eventType}",
|
r0mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}",
|
||||||
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
emptyString := ""
|
emptyString := ""
|
||||||
return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], vars["txnID"], &emptyString, cfg, queryAPI, producer)
|
eventType := vars["eventType"]
|
||||||
|
// If there's a trailing slash, remove it
|
||||||
|
if strings.HasSuffix(eventType, "/") {
|
||||||
|
eventType = eventType[:len(eventType)-1]
|
||||||
|
}
|
||||||
|
return writers.SendEvent(req, device, vars["roomID"], eventType, "", &emptyString, cfg, queryAPI, producer)
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}",
|
r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}",
|
||||||
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
stateKey := vars["stateKey"]
|
stateKey := vars["stateKey"]
|
||||||
return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], vars["txnID"], &stateKey, cfg, queryAPI, producer)
|
return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], "", &stateKey, cfg, queryAPI, producer)
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,9 +17,6 @@ package input
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
|
@ -35,15 +32,6 @@ type RoomserverInputAPI struct {
|
||||||
// The kafkaesque topic to output new room events to.
|
// The kafkaesque topic to output new room events to.
|
||||||
// This is the name used in kafka to identify the stream to write events to.
|
// This is the name used in kafka to identify the stream to write events to.
|
||||||
OutputRoomEventTopic string
|
OutputRoomEventTopic string
|
||||||
// If non-nil then the API will stop processing messages after this
|
|
||||||
// many messages and will shutdown. Malformed messages are not in the count.
|
|
||||||
StopProcessingAfter *int64
|
|
||||||
// If not-nil then the API will call this to shutdown the server.
|
|
||||||
// If this is nil then the API will continue to process messsages even
|
|
||||||
// though StopProcessingAfter has been reached.
|
|
||||||
ShutdownCallback func(reason string)
|
|
||||||
// How many messages the consumer has processed.
|
|
||||||
processed int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteOutputEvents implements OutputRoomEventWriter
|
// WriteOutputEvents implements OutputRoomEventWriter
|
||||||
|
|
@ -72,18 +60,6 @@ func (r *RoomserverInputAPI) InputRoomEvents(
|
||||||
if err := processRoomEvent(r.DB, r, request.InputRoomEvents[i]); err != nil {
|
if err := processRoomEvent(r.DB, r, request.InputRoomEvents[i]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Update the number of processed messages using atomic addition because it is accessed from multiple goroutines.
|
|
||||||
processed := atomic.AddInt64(&r.processed, 1)
|
|
||||||
// Check if we should stop processing.
|
|
||||||
// Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages.
|
|
||||||
// If we try to stop processing after M message and we have N goroutines then we will process somewhere
|
|
||||||
// between M and (N + M) messages because the N goroutines could all try to process what they think will be the
|
|
||||||
// last message. We could be more careful here but this is good enough for getting rough benchmarks.
|
|
||||||
if r.StopProcessingAfter != nil && processed >= int64(*r.StopProcessingAfter) {
|
|
||||||
if r.ShutdownCallback != nil {
|
|
||||||
r.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", r.processed))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue